You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/11/09 21:00:12 UTC
(camel) branch main updated: CAMEL-19905: Add streaming option to camel-platform-http-vertx
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new eefdd528b80 CAMEL-19905: Add streaming option to camel-platform-http-vertx
eefdd528b80 is described below
commit eefdd528b80ec28da81a3b4401704b3dfbdf5d00
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Nov 9 13:39:12 2023 +0000
CAMEL-19905: Add streaming option to camel-platform-http-vertx
---
.../camel/catalog/components/platform-http.json | 13 +-
.../src/main/docs/platform-http-vertx.adoc | 12 +
.../platform/http/vertx/AsyncInputStream.java | 243 +++++++++++++++++++
.../http/vertx/DefaultHttpRequestBodyHandler.java | 50 ++++
.../http/vertx/HttpRequestBodyHandler.java | 50 ++++
.../vertx/StreamingHttpRequestBodyHandler.java | 93 ++++++++
.../http/vertx/VertxPlatformHttpConsumer.java | 142 ++++++-----
.../http/vertx/VertxPlatformHttpSupport.java | 89 +++++--
.../http/vertx/VertxPlatformHttpEngineTest.java | 2 +-
...VertxPlatformHttpLargeMessageStreamingTest.java | 94 ++++++++
.../http/vertx/VertxPlatformHttpProxyTest.java | 10 +-
.../http/vertx/VertxPlatformHttpStreamingTest.java | 264 +++++++++++++++++++++
.../http/PlatformHttpEndpointConfigurer.java | 6 +
.../http/PlatformHttpEndpointUriFactory.java | 3 +-
.../component/platform/http/platform-http.json | 13 +-
.../platform/http/PlatformHttpEndpoint.java | 11 +
.../dsl/PlatformHttpEndpointBuilderFactory.java | 31 +++
17 files changed, 1009 insertions(+), 117 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json
index bb67bb10512..69c9b292526 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json
@@ -33,11 +33,12 @@
"matchOnUriPrefix": { "index": 3, "kind": "parameter", "displayName": "Match On Uri Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found." },
"muteException": { "index": 4, "kind": "parameter", "displayName": "Mute Exception", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "If enabled and an Exchange failed processing on the consumer side the response's body won't contain the exception's stack trace." },
"produces": { "index": 5, "kind": "parameter", "displayName": "Produces", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The content type this endpoint produces, such as application\/xml or application\/json." },
- "bridgeErrorHandler": { "index": 6, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...]
- "exceptionHandler": { "index": 7, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
- "exchangePattern": { "index": 8, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
- "fileNameExtWhitelist": { "index": 9, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
- "headerFilterStrategy": { "index": 10, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
- "platformHttpEngine": { "index": 11, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
+ "useStreaming": { "index": 6, "kind": "parameter", "displayName": "Use Streaming", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use streaming for large requests and responses" },
+ "bridgeErrorHandler": { "index": 7, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...]
+ "exceptionHandler": { "index": 8, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
+ "exchangePattern": { "index": 9, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+ "fileNameExtWhitelist": { "index": 10, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
+ "headerFilterStrategy": { "index": 11, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
+ "platformHttpEngine": { "index": 12, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
}
}
diff --git a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc
index 118225964ff..176d5bef976 100644
--- a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc
+++ b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc
@@ -96,3 +96,15 @@ Camel `HttpMessage` as shown in the custom `Processor` below :
});
----
+== Handling large request / response payloads
+
+When large request / response payloads are expected, there is a `useStreaming` option, which can be enabled to improve performance.
+When `useStreaming` is `true`, it will take advantage of xref:manual::stream-caching.adoc[stream caching]. In conjunction with enabling disk spooling, you can avoid having to store the entire request body payload in memory.
+
+[source,java]
+----
+// Handle a large request body and stream it to a file
+from("platform-http:/upload?httpMethodRestrict=POST&useStreaming=true")
+ .log("Processing large request body...")
+ .to("file:/uploads?fileName=uploaded.txt")
+----
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
new file mode 100644
index 00000000000..813dbbe6a85
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.impl.InboundBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ReadStream} that can process an {@link InputStream} in an asynchronous way, so that the content can be pumped
+ * to the {@link io.vertx.core.streams.WriteStream} of an {@link io.vertx.core.http.HttpServerResponse}.
+ */
+public class AsyncInputStream implements ReadStream<Buffer> {
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncInputStream.class);
+ private static final int DEFAULT_BUFFER_SIZE = 4096;
+
+ private final ReadableByteChannel channel;
+ private final Vertx vertx;
+ private final Context context;
+ private final InboundBuffer<Buffer> queue;
+ private long readPos;
+ private boolean closed;
+ private boolean readInProgress;
+ private Handler<Buffer> dataHandler;
+ private Handler<Void> endHandler;
+ private Handler<Throwable> exceptionHandler;
+
+ public AsyncInputStream(Vertx vertx, Context context, InputStream inputStream) {
+ this.vertx = vertx;
+ this.context = context;
+ this.channel = Channels.newChannel(inputStream);
+ this.queue = new InboundBuffer<>(context, 0);
+ queue.handler(buffer -> {
+ if (buffer.length() > 0) {
+ handleData(buffer);
+ } else {
+ handleEnd();
+ }
+ });
+ queue.drainHandler(v -> doRead());
+ }
+
+ @Override
+ public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) {
+ checkStreamClosed();
+ this.endHandler = endHandler;
+ return this;
+ }
+
+ @Override
+ public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) {
+ checkStreamClosed();
+ this.exceptionHandler = exceptionHandler;
+ return this;
+ }
+
+ @Override
+ public synchronized AsyncInputStream handler(Handler<Buffer> handler) {
+ checkStreamClosed();
+ this.dataHandler = handler;
+ if (this.dataHandler != null && !this.closed) {
+ this.doRead();
+ } else {
+ queue.clear();
+ }
+ return this;
+ }
+
+ @Override
+ public synchronized AsyncInputStream pause() {
+ checkStreamClosed();
+ queue.pause();
+ return this;
+ }
+
+ @Override
+ public synchronized AsyncInputStream resume() {
+ checkStreamClosed();
+ queue.resume();
+ return this;
+ }
+
+ @Override
+ public ReadStream<Buffer> fetch(long amount) {
+ checkStreamClosed();
+ queue.fetch(amount);
+ return this;
+ }
+
+ public void close(Handler<AsyncResult<Void>> handler) {
+ closeInternal(handler);
+ }
+
+ private void checkStreamClosed() {
+ if (this.closed) {
+ throw new IllegalStateException("Stream closed");
+ }
+ }
+
+ private void checkContext() {
+ Context contextToCheck = vertx.getOrCreateContext();
+ if (!contextToCheck.equals(context)) {
+ throw new IllegalStateException(
+ "AsyncInputStream must only be used in the context that created it, expected: " + this.context
+ + " actual " + contextToCheck);
+ }
+ }
+
+ private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
+ closed = true;
+ doClose(handler);
+ }
+
+ private void doClose(Handler<AsyncResult<Void>> handler) {
+ try {
+ channel.close();
+ if (handler != null) {
+ this.vertx.runOnContext(v -> handler.handle(Future.succeededFuture()));
+ }
+ } catch (IOException e) {
+ if (handler != null) {
+ this.vertx.runOnContext(v -> handler.handle(Future.failedFuture(e)));
+ }
+ }
+ }
+
+ private void doRead() {
+ checkStreamClosed();
+ doRead(ByteBuffer.allocate(DEFAULT_BUFFER_SIZE));
+ }
+
+ private synchronized void doRead(ByteBuffer buffer) {
+ if (!readInProgress) {
+ readInProgress = true;
+ Buffer buff = Buffer.buffer(DEFAULT_BUFFER_SIZE);
+ doRead(buff, 0, buffer, readPos, result -> {
+ if (result.succeeded()) {
+ readInProgress = false;
+ Buffer updatedBuffer = result.result();
+ readPos += updatedBuffer.length();
+ if (queue.write(updatedBuffer) && updatedBuffer.length() > 0) {
+ doRead(buffer);
+ }
+ } else {
+ handleException(result.cause());
+ }
+ });
+ }
+ }
+
+ private void doRead(Buffer writeBuff, int offset, ByteBuffer buffer, long position, Handler<AsyncResult<Buffer>> handler) {
+ vertx.executeBlocking(() -> channel.read(buffer))
+ .onComplete(result -> {
+ if (result.succeeded()) {
+ Integer bytesRead = result.result();
+ if (bytesRead == -1) {
+ // EOF
+ context.runOnContext((v) -> {
+ buffer.flip();
+ writeBuff.setBytes(offset, buffer);
+ buffer.compact();
+ handler.handle(Future.succeededFuture(writeBuff));
+ });
+ } else if (buffer.hasRemaining()) {
+ // Read from the next offset
+ context.runOnContext((v) -> {
+ doRead(writeBuff, offset, buffer, position + bytesRead, handler);
+ });
+ } else {
+ // All data is written
+ context.runOnContext((v) -> {
+ buffer.flip();
+ writeBuff.setBytes(offset, buffer);
+ buffer.compact();
+ handler.handle(Future.succeededFuture(writeBuff));
+ });
+ }
+ } else {
+ context.runOnContext((v) -> handler.handle(Future.failedFuture(result.cause())));
+ }
+ });
+ }
+
+ private void handleData(Buffer buffer) {
+ Handler<Buffer> handler;
+ synchronized (this) {
+ handler = this.dataHandler;
+ }
+ if (handler != null) {
+ checkContext();
+ handler.handle(buffer);
+ }
+ }
+
+ private synchronized void handleEnd() {
+ Handler<Void> endHandler;
+ synchronized (this) {
+ dataHandler = null;
+ endHandler = this.endHandler;
+ }
+ if (endHandler != null) {
+ checkContext();
+ endHandler.handle(null);
+ }
+ }
+
+ private void handleException(Throwable t) {
+ if (exceptionHandler != null && t instanceof Exception) {
+ exceptionHandler.handle(t);
+ } else {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Unhandled error while processing stream", t);
+ }
+ }
+ }
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java
new file mode 100644
index 00000000000..4f187da7f0a
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.web.RequestBody;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.camel.Message;
+
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData;
+
+/**
+ * Default {@link HttpRequestBodyHandler} that will read to read the entire HTTP request body into memory.
+ */
+class DefaultHttpRequestBodyHandler extends HttpRequestBodyHandler {
+ DefaultHttpRequestBodyHandler(Handler<RoutingContext> delegate) {
+ super(delegate);
+ }
+
+ @Override
+ void configureRoute(Route route) {
+ route.handler(delegate);
+ }
+
+ @Override
+ Future<Void> handle(RoutingContext routingContext, Message message) {
+ if (!isMultiPartFormData(routingContext) && !isFormUrlEncoded(routingContext)) {
+ final RequestBody requestBody = routingContext.body();
+ message.setBody(requestBody.buffer());
+ }
+ return Future.succeededFuture();
+ }
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java
new file mode 100644
index 00000000000..e537612a518
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.camel.Message;
+
+/**
+ * Abstraction to handle HTTP request body processing.
+ */
+abstract class HttpRequestBodyHandler {
+ Handler<RoutingContext> delegate;
+
+ HttpRequestBodyHandler(Handler<RoutingContext> delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * Performs any required configuration on a {@link Route}.
+ *
+ * @param route The route to configure
+ */
+ abstract void configureRoute(Route route);
+
+ /**
+ * Processes the incoming HTTP request body.
+ *
+ * @param routingContext The {@link RoutingContext} for the HTTP request being processed
+ * @param message The {@link Message} associated with the HTTP request being processed
+ * @return {@link Future} to determine when the HTTP request body has been fully processed
+ */
+ abstract Future<Void> handle(RoutingContext routingContext, Message message);
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java
new file mode 100644
index 00000000000..88531a6a07e
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.camel.Message;
+import org.apache.camel.converter.stream.CachedOutputStream;
+
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData;
+
+/**
+ * A {@link HttpRequestBodyHandler} that can handle large request bodies via {@link CachedOutputStream}.
+ */
+class StreamingHttpRequestBodyHandler extends HttpRequestBodyHandler {
+ StreamingHttpRequestBodyHandler(Handler<RoutingContext> delegate) {
+ super(delegate);
+ }
+
+ @Override
+ void configureRoute(Route route) {
+ // No configuration necessary for streaming
+ }
+
+ @Override
+ Future<Void> handle(RoutingContext routingContext, Message message) {
+ // Reject multipart requests if streaming enabled as we can't be sure when Vert.x has
+ // fully written the attachments to disk after invoking the default body handler.
+ if (isMultiPartFormData(routingContext)) {
+ return Future.failedFuture(
+ new IllegalStateException("Cannot process multipart/form-data requests when useStreaming=true"));
+ }
+
+ Promise<Void> promise = Promise.promise();
+ HttpServerRequest request = routingContext.request();
+ if (isFormUrlEncoded(routingContext)) {
+ // Delegate body handling to the default body handler
+ delegate.handle(routingContext);
+ request.endHandler(promise::complete);
+ } else {
+ // Process each body 'chunk' and write it to CachedOutputStream
+ CachedOutputStream stream = new CachedOutputStream(message.getExchange(), true);
+ AtomicReference<Exception> failureCause = new AtomicReference<>();
+ request.handler(buffer -> {
+ try {
+ stream.write(buffer.getBytes());
+ } catch (IOException e) {
+ failureCause.set(e);
+ }
+ });
+ // After the body is read, close the CachedOutputStream and get an InputStream to use as the message body
+ request.endHandler(event -> {
+ try {
+ stream.close();
+
+ Exception failure = failureCause.get();
+ if (failure == null) {
+ message.setBody(stream.getInputStream());
+ promise.complete();
+ } else {
+ promise.fail(failure);
+ }
+ } catch (IOException e) {
+ promise.fail(e);
+ }
+ });
+ }
+
+ return promise.future();
+ }
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index d948d5c833a..915c1926110 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -26,15 +26,13 @@ import java.util.regex.Pattern;
import jakarta.activation.DataHandler;
-import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.FileUpload;
-import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.impl.RouteImpl;
@@ -56,6 +54,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.appendHeader;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData;
import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.populateCamelHeaders;
import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.writeResponse;
@@ -74,6 +74,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
private String path;
private Route route;
private VertxPlatformHttpRouter router;
+ private HttpRequestBodyHandler httpRequestBodyHandler;
public VertxPlatformHttpConsumer(PlatformHttpEndpoint endpoint,
Processor processor,
@@ -97,6 +98,11 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
methods = Method.parseList(getEndpoint().getHttpMethodRestrict());
path = configureEndpointPath(getEndpoint());
router = VertxPlatformHttpRouter.lookup(getEndpoint().getCamelContext());
+ if (!getEndpoint().isHttpProxy() && getEndpoint().isUseStreaming()) {
+ httpRequestBodyHandler = new StreamingHttpRequestBodyHandler(router.bodyHandler());
+ } else {
+ httpRequestBodyHandler = new DefaultHttpRequestBodyHandler(router.bodyHandler());
+ }
}
@Override
@@ -126,7 +132,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
}
}
- newRoute.handler(router.bodyHandler());
+ httpRequestBodyHandler.configureRoute(newRoute);
for (Handler<RoutingContext> handler : handlers) {
newRoute.handler(handler);
}
@@ -161,7 +167,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
}
final Vertx vertx = ctx.vertx();
- final Exchange exchange = toExchange(ctx);
+ final Exchange exchange = createExchange(false);
+ exchange.setPattern(ExchangePattern.InOut);
//
// We do not know if any of the processing logic of the route is synchronous or not so we
@@ -180,45 +187,51 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
// .to("rest:get:?bridgeEndpoint=true");
//
- if (getEndpoint().isHttpProxy()) {
- handleProxy(ctx, exchange);
- }
+ // Note: any logic that needs to interrogate HTTP headers not provided by RoutingContext.parsedHeaders, should
+ // be done inside of the following onComplete block, to ensure that the HTTP request is fully processed.
+ processHttpRequest(exchange, ctx).onComplete(result -> {
+ if (result.failed()) {
+ handleFailure(exchange, ctx, result.cause());
+ return;
+ }
- vertx.executeBlocking(() -> processRequest(exchange), false)
- .onComplete(result -> processResult(ctx, result, exchange));
- }
+ if (getEndpoint().isHttpProxy()) {
+ handleProxy(ctx, exchange);
+ }
- private void processResult(
- RoutingContext ctx, AsyncResult<Object> result, Exchange exchange) {
- Throwable failure = null;
- try {
- if (result.succeeded()) {
- try {
- writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy(), muteExceptions);
- } catch (Exception e) {
- failure = e;
+ populateMultiFormData(ctx, exchange.getIn(), getEndpoint().getHeaderFilterStrategy());
+
+ vertx.executeBlocking(() -> processExchange(exchange), false).onComplete(processExchangeResult -> {
+ if (processExchangeResult.succeeded()) {
+ writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy(), muteExceptions)
+ .onComplete(writeResponseResult -> {
+ if (writeResponseResult.succeeded()) {
+ handleExchangeComplete(exchange);
+ } else {
+ handleFailure(exchange, ctx, writeResponseResult.cause());
+ }
+ });
+ } else {
+ handleFailure(exchange, ctx, processExchangeResult.cause());
}
- } else {
- failure = result.cause();
- }
+ });
+ });
+ }
- if (failure != null) {
- handleFailure(ctx, failure);
- }
- } finally {
- doneUoW(exchange);
- releaseExchange(exchange, false);
- }
+ private void handleExchangeComplete(Exchange exchange) {
+ doneUoW(exchange);
+ releaseExchange(exchange, false);
}
- private void handleFailure(RoutingContext ctx, Throwable failure) {
+ private void handleFailure(Exchange exchange, RoutingContext ctx, Throwable failure) {
getExceptionHandler().handleException(
"Failed handling platform-http endpoint " + getEndpoint().getPath(),
failure);
ctx.fail(failure);
+ handleExchangeComplete(exchange);
}
- private Object processRequest(Exchange exchange) throws Exception {
+ private Object processExchange(Exchange exchange) throws Exception {
createUoW(exchange);
getProcessor().process(exchange);
return null;
@@ -236,10 +249,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
exchange.getMessage().removeHeader("Proxy-Connection");
}
- protected Exchange toExchange(RoutingContext ctx) {
- final Exchange exchange = createExchange(false);
- exchange.setPattern(ExchangePattern.InOut);
-
+ protected Future<Void> processHttpRequest(Exchange exchange, RoutingContext ctx) {
// reuse existing http message if pooled
Message in = exchange.getIn();
if (in instanceof HttpMessage hm) {
@@ -248,7 +258,6 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
in = new HttpMessage(exchange, ctx.request(), ctx.response());
exchange.setMessage(in);
}
- populateCamelMessage(ctx, exchange, in);
final String charset = ctx.parsedHeaders().contentType().parameter("charset");
if (charset != null) {
@@ -261,53 +270,38 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
in.setHeader(VertxPlatformHttpConstants.AUTHENTICATED_USER, user);
}
- return exchange;
+ return populateCamelMessage(ctx, exchange, in);
}
- protected void populateCamelMessage(RoutingContext ctx, Exchange exchange, Message result) {
+ protected Future<Void> populateCamelMessage(RoutingContext ctx, Exchange exchange, Message message) {
final HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
- populateCamelHeaders(ctx, result.getHeaders(), exchange, headerFilterStrategy);
- final String mimeType = ctx.parsedHeaders().contentType().value();
- final boolean isMultipartFormData = "multipart/form-data".equals(mimeType);
-
- if ("application/x-www-form-urlencoded".equals(mimeType) || isMultipartFormData) {
- populateMultiFormData(ctx, exchange, result, headerFilterStrategy, isMultipartFormData);
- } else {
- populateDefaultMessage(ctx, result);
- }
- }
-
- private static void populateDefaultMessage(RoutingContext ctx, Message result) {
- final RequestBody requestBody = ctx.body();
- final Buffer body = requestBody.buffer();
- if (body != null) {
- result.setBody(body);
- } else {
- result.setBody(null);
- }
+ populateCamelHeaders(ctx, message.getHeaders(), exchange, headerFilterStrategy);
+ return httpRequestBodyHandler.handle(ctx, message);
}
private void populateMultiFormData(
- RoutingContext ctx, Exchange exchange, Message result, HeaderFilterStrategy headerFilterStrategy,
- boolean isMultipartFormData) {
- final MultiMap formData = ctx.request().formAttributes();
- final Map<String, Object> body = new HashMap<>();
- for (String key : formData.names()) {
- for (String value : formData.getAll(key)) {
- if (headerFilterStrategy != null
- && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
- appendHeader(result.getHeaders(), key, value);
- appendHeader(body, key, value);
+ RoutingContext ctx, Message message, HeaderFilterStrategy headerFilterStrategy) {
+ final boolean isMultipartFormData = isMultiPartFormData(ctx);
+ if (isFormUrlEncoded(ctx) || isMultipartFormData) {
+ final MultiMap formData = ctx.request().formAttributes();
+ final Map<String, Object> body = new HashMap<>();
+ for (String key : formData.names()) {
+ for (String value : formData.getAll(key)) {
+ if (headerFilterStrategy != null
+ && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, message.getExchange())) {
+ appendHeader(message.getHeaders(), key, value);
+ appendHeader(body, key, value);
+ }
}
}
- }
- if (!body.isEmpty()) {
- result.setBody(body);
- }
+ if (!body.isEmpty()) {
+ message.setBody(body);
+ }
- if (isMultipartFormData) {
- populateAttachments(ctx.fileUploads(), result);
+ if (isMultipartFormData) {
+ populateAttachments(ctx.fileUploads(), message);
+ }
}
}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index 73e3e22641c..19b14a5fc47 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -27,11 +27,17 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.vertx.core.Context;
+import io.vertx.core.Future;
import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.net.SocketAddress;
+import io.vertx.core.streams.Pump;
import io.vertx.ext.web.RoutingContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -189,57 +195,72 @@ public final class VertxPlatformHttpSupport {
return codeToUse;
}
- static void writeResponse(
- RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy, boolean muteExceptions)
- throws Exception {
+ static Future<Void> writeResponse(
+ RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy, boolean muteExceptions) {
final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy, muteExceptions);
- final HttpServerResponse response = ctx.response();
+ final Promise<Void> promise = Promise.promise();
if (body == null) {
LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange);
- response.end();
+ ctx.end();
+ promise.complete();
} else if (body instanceof String) {
- response.end((String) body);
+ ctx.end((String) body);
+ promise.complete();
} else if (body instanceof InputStream) {
- writeResponseAs(response, (InputStream) body);
+ writeResponseAs(promise, ctx, (InputStream) body);
} else if (body instanceof Buffer) {
- response.end((Buffer) body);
+ ctx.end((Buffer) body);
+ promise.complete();
} else {
- writeResponseAsFallback(camelExchange, body, response);
+ try {
+ writeResponseAsFallback(promise, camelExchange, body, ctx);
+ } catch (IOException | NoTypeConversionAvailableException e) {
+ promise.fail(e);
+ }
}
+
+ return promise.future();
}
- private static void writeResponseAsFallback(Exchange camelExchange, Object body, HttpServerResponse response)
+ private static void writeResponseAsFallback(Promise<Void> promise, Exchange camelExchange, Object body, RoutingContext ctx)
throws NoTypeConversionAvailableException, IOException {
final TypeConverter tc = camelExchange.getContext().getTypeConverter();
// Try to convert to ByteBuffer for performance reason
final ByteBuffer bb = tc.tryConvertTo(ByteBuffer.class, camelExchange, body);
if (bb != null) {
- writeResponseAs(response, bb);
+ writeResponseAs(promise, ctx, bb);
} else {
// Otherwise fallback to most generic InputStream conversion
final InputStream is = tc.mandatoryConvertTo(InputStream.class, camelExchange, body);
- writeResponseAs(response, is);
+ writeResponseAs(promise, ctx, is);
}
}
- private static void writeResponseAs(HttpServerResponse response, ByteBuffer bb) {
+ private static void writeResponseAs(Promise<Void> promise, RoutingContext ctx, ByteBuffer bb) {
final Buffer b = Buffer.buffer(bb.capacity());
b.setBytes(0, bb);
- response.end(b);
+ ctx.end(b);
+ promise.complete();
}
- private static void writeResponseAs(HttpServerResponse response, InputStream is) throws IOException {
- final byte[] bytes = new byte[4096];
- try (InputStream in = is) {
- int len;
- while ((len = in.read(bytes)) >= 0) {
- final Buffer b = Buffer.buffer(len);
- b.appendBytes(bytes, 0, len);
- response.write(b);
- }
- }
- response.end();
+ private static void writeResponseAs(Promise<Void> promise, RoutingContext ctx, InputStream is) {
+ HttpServerResponse response = ctx.response();
+ Vertx vertx = ctx.vertx();
+ Context context = vertx.getOrCreateContext();
+
+ // Process the InputStream async to avoid blocking the Vert.x event loop on large responses
+ AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, context, is);
+ asyncInputStream.exceptionHandler(promise::fail);
+ asyncInputStream.endHandler(event -> {
+ response.end().onComplete(result -> {
+ asyncInputStream.close(closeResult -> promise.complete());
+ });
+ });
+
+ // Pump the InputStream content into the HTTP response WriteStream
+ Pump pump = Pump.pump(asyncInputStream, response);
+ context.runOnContext(event -> pump.start());
}
static void populateCamelHeaders(
@@ -336,4 +357,22 @@ public final class VertxPlatformHttpSupport {
value = list;
return value;
}
+
+ static boolean isMultiPartFormData(RoutingContext ctx) {
+ return isContentTypeMatching(ctx, HttpHeaderValues.MULTIPART_FORM_DATA.toString());
+ }
+
+ static boolean isFormUrlEncoded(RoutingContext ctx) {
+ return isContentTypeMatching(ctx, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString());
+ }
+
+ private static boolean isContentTypeMatching(RoutingContext ctx, String expectedContentType) {
+ String contentType = ctx.parsedHeaders().contentType().value();
+ boolean match = false;
+ if (org.apache.camel.util.ObjectHelper.isNotEmpty(contentType)) {
+ String lowerCaseContentType = contentType.toLowerCase();
+ match = lowerCaseContentType.startsWith(expectedContentType);
+ }
+ return match;
+ }
}
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
index 633826ad959..8b5b5642ebf 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
@@ -880,7 +880,7 @@ public class VertxPlatformHttpEngineTest {
return createCamelContext(null);
}
- private static CamelContext createCamelContext(ServerConfigurationCustomizer customizer) throws Exception {
+ static CamelContext createCamelContext(ServerConfigurationCustomizer customizer) throws Exception {
int port = AvailablePortFinder.getNextAvailable();
VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
conf.setBindPort(port);
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java
new file mode 100644
index 00000000000..7c3a118a30e
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.IOHelper;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import static io.restassured.RestAssured.given;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@EnabledIfSystemProperty(named = "performance-tests", matches = ".*")
+public class VertxPlatformHttpLargeMessageStreamingTest {
+
+ @Test
+ void testStreamingWithLargeRequestAndResponseBody() throws Exception {
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+ context.getStreamCachingStrategy().setSpoolEnabled(true);
+
+ Path input = createLargeFile();
+ Path output = Files.createTempFile("platform-http-output", "dat");
+
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .log("Done echoing back request body as response body");
+ }
+ });
+
+ context.start();
+
+ InputStream response = given()
+ .body(new FileInputStream(input.toFile()))
+ .post("/streaming")
+ .then()
+ .extract()
+ .asInputStream();
+
+ try (FileOutputStream fos = new FileOutputStream(output.toFile())) {
+ IOHelper.copy(response, fos);
+ }
+
+ assertEquals(input.toFile().length(), output.toFile().length());
+ } finally {
+ context.stop();
+ Files.deleteIfExists(input);
+ Files.deleteIfExists(output);
+ }
+ }
+
+ private Path createLargeFile() throws IOException {
+ // Create a 4GB file containing random data
+ Path path = Files.createTempFile("platform-http-input", "dat");
+ try (FileOutputStream fos = new FileOutputStream(path.toFile())) {
+ Random random = new Random();
+ long targetFileSize = (long) (4 * Math.pow(1024, 3));
+ long bytesWritten = 0L;
+
+ byte[] data = new byte[1024];
+ while (bytesWritten < targetFileSize) {
+ random.nextBytes(data);
+ fos.write(data);
+ bytesWritten += data.length;
+ }
+ }
+ return path;
+ }
+}
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java
index df2a47cdc9d..d940aef8ac3 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java
@@ -25,7 +25,8 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.AvailablePortFinder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
@@ -55,15 +56,16 @@ public class VertxPlatformHttpProxyTest {
}
}
- @Test
- void testProxy() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = { false, true })
+ void testProxy(boolean useStreaming) throws Exception {
final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
try {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
- from("platform-http:proxy")
+ from("platform-http:proxy?useStreaming=" + useStreaming)
.toD("${headers." + Exchange.HTTP_URI + "}?bridgeEndpoint=true");
}
});
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java
new file mode 100644
index 00000000000..786047cbda6
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import io.restassured.http.ContentType;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.IOHelper;
+import org.junit.jupiter.api.Test;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class VertxPlatformHttpStreamingTest {
+
+ @Test
+ void testStreamingWithStringRequestAndResponseBody() throws Exception {
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .transform().simple("Hello ${body}");
+ }
+ });
+
+ context.start();
+
+ String requestBody = "Vert.x Platform HTTP";
+ given()
+ .body(requestBody)
+ .post("/streaming")
+ .then()
+ .statusCode(200)
+ .body(is("Hello " + requestBody));
+ } finally {
+ context.stop();
+ }
+ }
+
+ @Test
+ void testStreamingWithFileRequestAndResponseBody() throws Exception {
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+ String content = "Hello World";
+ Path testFile = Files.createTempFile("platform-http-testing", "txt");
+ Files.writeString(testFile, content);
+
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .log("Done processing request");
+ }
+ });
+
+ context.start();
+
+ given()
+ .body(testFile.toFile())
+ .post("/streaming")
+ .then()
+ .statusCode(200)
+ .body(is(content));
+ } finally {
+ context.stop();
+ Files.deleteIfExists(testFile);
+ }
+ }
+
+ @Test
+ void testStreamingWithFormUrlEncodedBody() throws Exception {
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .setBody().simple("foo = ${header.foo}");
+ }
+ });
+
+ context.start();
+
+ given()
+ .contentType(ContentType.URLENC)
+ .formParam("foo", "bar")
+ .post("/streaming")
+ .then()
+ .statusCode(200)
+ .body(is("foo = bar"));
+ } finally {
+ context.stop();
+ }
+ }
+
+ @Test
+ void testStreamingWithMultiPartRequestRejected() throws Exception {
+ String content = "Hello World";
+ Path testFile = Files.createTempFile("platform-http-testing", "txt");
+ Files.writeString(testFile, content);
+
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(configuration -> {
+ VertxPlatformHttpServerConfiguration.BodyHandler bodyHandler
+ = new VertxPlatformHttpServerConfiguration.BodyHandler();
+ // turn on file uploads
+ bodyHandler.setHandleFileUploads(true);
+ bodyHandler.setUploadsDirectory(testFile.toFile().getParent());
+ configuration.setBodyHandler(bodyHandler);
+ });
+
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .setBody().constant("multipart request should have been rejected");
+ }
+ });
+
+ context.start();
+
+ given()
+ .multiPart(testFile.toFile())
+ .post("/streaming")
+ .then()
+ .statusCode(500);
+ } finally {
+ context.stop();
+ Files.deleteIfExists(testFile);
+ }
+ }
+
+ @Test
+ void testStreamingWithSpecificEncoding() throws Exception {
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+ Path input = Files.createTempFile("platform-http-input", "dat");
+ Path output = Files.createTempFile("platform-http-output", "dat");
+
+ String fileContent = "Content with special character ð";
+ Files.writeString(input, fileContent, StandardCharsets.ISO_8859_1);
+
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .log("Done echoing back request body as response body");
+ }
+ });
+
+ context.start();
+
+ InputStream response = given()
+ .body(new FileInputStream(input.toFile()))
+ .post("/streaming")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .asInputStream();
+
+ try (FileOutputStream fos = new FileOutputStream(output.toFile())) {
+ IOHelper.copy(response, fos);
+ }
+
+ assertEquals(fileContent, Files.readString(output, StandardCharsets.ISO_8859_1));
+ } finally {
+ context.stop();
+ }
+ }
+
+ @Test
+ void testStreamingWithClosedInputStreamResponse() throws Exception {
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // Simulate an error processing an input stream by closing it ahead of the response being written
+ // Verifies the response promise.fail is called correctly
+ InputStream stream = getClass().getResourceAsStream("/authentication/auth.properties");
+ if (stream != null) {
+ stream.close();
+ }
+ exchange.getMessage().setBody(stream);
+ }
+ });
+ }
+ });
+
+ context.start();
+
+ given()
+ .get("/streaming")
+ .then()
+ .statusCode(500);
+ } finally {
+ context.stop();
+ }
+ }
+
+ @Test
+ void testStreamingWithUnconvertableResponseType() throws Exception {
+ final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("platform-http:/streaming?useStreaming=true")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // Force a type conversion exception and verify the response promise.fail is called correctly
+ exchange.getMessage().setBody(new TestBean());
+ }
+ });
+ }
+ });
+
+ context.start();
+
+ given()
+ .get("/streaming")
+ .then()
+ .statusCode(500);
+ } finally {
+ context.stop();
+ }
+ }
+
+ static final class TestBean {
+ }
+}
diff --git a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java
index 426e9de52d5..6223fdd295b 100644
--- a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java
+++ b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java
@@ -41,6 +41,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im
case "platformhttpengine":
case "platformHttpEngine": target.setPlatformHttpEngine(property(camelContext, org.apache.camel.component.platform.http.spi.PlatformHttpEngine.class, value)); return true;
case "produces": target.setProduces(property(camelContext, java.lang.String.class, value)); return true;
+ case "usestreaming":
+ case "useStreaming": target.setUseStreaming(property(camelContext, boolean.class, value)); return true;
default: return false;
}
}
@@ -68,6 +70,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im
case "platformhttpengine":
case "platformHttpEngine": return org.apache.camel.component.platform.http.spi.PlatformHttpEngine.class;
case "produces": return java.lang.String.class;
+ case "usestreaming":
+ case "useStreaming": return boolean.class;
default: return null;
}
}
@@ -96,6 +100,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im
case "platformhttpengine":
case "platformHttpEngine": return target.getPlatformHttpEngine();
case "produces": return target.getProduces();
+ case "usestreaming":
+ case "useStreaming": return target.isUseStreaming();
default: return null;
}
}
diff --git a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java
index f680a4e90a8..78a658d5a8a 100644
--- a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java
+++ b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class PlatformHttpEndpointUriFactory extends org.apache.camel.support.com
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(12);
+ Set<String> props = new HashSet<>(13);
props.add("bridgeErrorHandler");
props.add("consumes");
props.add("exceptionHandler");
@@ -34,6 +34,7 @@ public class PlatformHttpEndpointUriFactory extends org.apache.camel.support.com
props.add("path");
props.add("platformHttpEngine");
props.add("produces");
+ props.add("useStreaming");
PROPERTY_NAMES = Collections.unmodifiableSet(props);
SECRET_PROPERTY_NAMES = Collections.emptySet();
MULTI_VALUE_PREFIXES = Collections.emptySet();
diff --git a/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json b/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json
index bb67bb10512..69c9b292526 100644
--- a/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json
+++ b/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json
@@ -33,11 +33,12 @@
"matchOnUriPrefix": { "index": 3, "kind": "parameter", "displayName": "Match On Uri Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found." },
"muteException": { "index": 4, "kind": "parameter", "displayName": "Mute Exception", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "If enabled and an Exchange failed processing on the consumer side the response's body won't contain the exception's stack trace." },
"produces": { "index": 5, "kind": "parameter", "displayName": "Produces", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The content type this endpoint produces, such as application\/xml or application\/json." },
- "bridgeErrorHandler": { "index": 6, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...]
- "exceptionHandler": { "index": 7, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
- "exchangePattern": { "index": 8, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
- "fileNameExtWhitelist": { "index": 9, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
- "headerFilterStrategy": { "index": 10, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
- "platformHttpEngine": { "index": 11, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
+ "useStreaming": { "index": 6, "kind": "parameter", "displayName": "Use Streaming", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use streaming for large requests and responses" },
+ "bridgeErrorHandler": { "index": 7, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...]
+ "exceptionHandler": { "index": 8, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
+ "exchangePattern": { "index": 9, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+ "fileNameExtWhitelist": { "index": 10, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
+ "headerFilterStrategy": { "index": 11, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
+ "platformHttpEngine": { "index": 12, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
}
}
diff --git a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java
index 6c3a3e81de1..2b12a69e88c 100644
--- a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java
+++ b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java
@@ -69,6 +69,9 @@ public class PlatformHttpEndpoint extends DefaultEndpoint implements AsyncEndpoi
@UriParam(label = "advanced",
description = "To use a custom HeaderFilterStrategy to filter headers to and from Camel message.")
private HeaderFilterStrategy headerFilterStrategy = new PlatformHttpHeaderFilterStrategy();
+ @UriParam(label = "consumer", defaultValue = "false",
+ description = "Whether to use streaming for large requests and responses")
+ private boolean useStreaming;
public PlatformHttpEndpoint(String uri, String remaining, Component component) {
super(uri, component);
@@ -168,6 +171,14 @@ public class PlatformHttpEndpoint extends DefaultEndpoint implements AsyncEndpoi
this.muteException = muteException;
}
+ public boolean isUseStreaming() {
+ return useStreaming;
+ }
+
+ public void setUseStreaming(boolean useStreaming) {
+ this.useStreaming = useStreaming;
+ }
+
PlatformHttpEngine getOrCreateEngine() {
return platformHttpEngine != null
? platformHttpEngine
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java
index 8a6dd245fcc..c5db0313347 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java
@@ -159,6 +159,37 @@ public interface PlatformHttpEndpointBuilderFactory {
doSetProperty("produces", produces);
return this;
}
+ /**
+ * Whether to use streaming for large requests and responses.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: consumer
+ *
+ * @param useStreaming the value to set
+ * @return the dsl builder
+ */
+ default PlatformHttpEndpointBuilder useStreaming(boolean useStreaming) {
+ doSetProperty("useStreaming", useStreaming);
+ return this;
+ }
+ /**
+ * Whether to use streaming for large requests and responses.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: consumer
+ *
+ * @param useStreaming the value to set
+ * @return the dsl builder
+ */
+ default PlatformHttpEndpointBuilder useStreaming(String useStreaming) {
+ doSetProperty("useStreaming", useStreaming);
+ return this;
+ }
}
/**