You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/08/19 08:54:33 UTC
[camel-k-runtime] branch master updated: Base knative-http
component to vertx-web as the next undertow major release (3) will be based
on it #126
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new 53256bc Base knative-http component to vertx-web as the next undertow major release (3) will be based on it #126
53256bc is described below
commit 53256bcdf4f0123be452ad835e17b50965b88f5a
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Wed Aug 14 00:37:21 2019 +0200
Base knative-http component to vertx-web as the next undertow major release (3) will be based on it #126
---
camel-k-loader-kotlin/pom.xml | 1 -
camel-knative-http/pom.xml | 11 +-
.../camel/component/knative/http/KnativeHttp.java | 109 +------
.../component/knative/http/KnativeHttpBinding.java | 318 ---------------------
.../knative/http/KnativeHttpClientCallback.java | 242 ----------------
.../knative/http/KnativeHttpComponent.java | 166 ++++++++---
.../knative/http/KnativeHttpConsumer.java | 234 +++++++++++----
.../http/KnativeHttpConsumerDispatcher.java | 193 +++++++++++++
.../knative/http/KnativeHttpDispatcher.java | 181 ------------
.../knative/http/KnativeHttpEndpoint.java | 38 +--
.../http/KnativeHttpHeaderFilterStrategy.java | 3 +-
.../knative/http/KnativeHttpProducer.java | 178 +++++++-----
.../component/knative/http/KnativeHttpSupport.java | 49 ++++
.../component/knative/http/KnativeHttpTest.java | 252 ++++++++--------
.../component/knative/CloudEventsV01Test.java | 14 +-
.../component/knative/CloudEventsV02Test.java | 14 +-
.../component/knative/KnativeComponentTest.java | 126 +-------
pom.xml | 1 +
18 files changed, 829 insertions(+), 1301 deletions(-)
diff --git a/camel-k-loader-kotlin/pom.xml b/camel-k-loader-kotlin/pom.xml
index f7c3b75..7ca24f4 100644
--- a/camel-k-loader-kotlin/pom.xml
+++ b/camel-k-loader-kotlin/pom.xml
@@ -177,7 +177,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <useSystemClassLoader>false</useSystemClassLoader>
<forkCount>0</forkCount>
</configuration>
</plugin>
diff --git a/camel-knative-http/pom.xml b/camel-knative-http/pom.xml
index 08c7a42..44b7e06 100644
--- a/camel-knative-http/pom.xml
+++ b/camel-knative-http/pom.xml
@@ -53,9 +53,14 @@
</dependency>
<dependency>
- <groupId>io.undertow</groupId>
- <artifactId>undertow-core</artifactId>
- <version>${undertow.version}</version>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-web</artifactId>
+ <version>${vertx.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-web-client</artifactId>
+ <version>${vertx.version}</version>
</dependency>
<dependency>
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
index ac5c713..aad1725 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
@@ -17,21 +17,26 @@
package org.apache.camel.component.knative.http;
import java.util.Objects;
-import javax.net.ssl.SSLContext;
+import java.util.regex.Pattern;
+
+import io.vertx.core.Handler;
+import io.vertx.core.http.HttpServerRequest;
public final class KnativeHttp {
+ public static final int DEFAULT_PORT = 8080;
+ public static final String DEFAULT_PATH = "/";
+ public static final Pattern ENDPOINT_PATTERN = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)");
+
private KnativeHttp() {
}
- public static final class HostKey {
+ public static final class ServerKey {
private final String host;
private final int port;
- private final SSLContext sslContext;
- public HostKey(String host, int port, SSLContext ssl) {
+ public ServerKey(String host, int port) {
this.host = host;
this.port = port;
- this.sslContext = ssl;
}
public String getHost() {
@@ -42,10 +47,6 @@ public final class KnativeHttp {
return port;
}
- public SSLContext getSslContext() {
- return sslContext;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -54,9 +55,8 @@ public final class KnativeHttp {
if (o == null || getClass() != o.getClass()) {
return false;
}
- HostKey key = (HostKey) o;
- return getPort() == key.getPort()
- && getHost().equals(key.getHost());
+ ServerKey key = (ServerKey) o;
+ return getPort() == key.getPort() && getHost().equals(key.getHost());
}
@Override
@@ -65,88 +65,7 @@ public final class KnativeHttp {
}
}
-
- /**
- * Options to configure an Undertow host.
- */
- public static final class HostOptions {
-
- /**
- * The number of worker threads to use in a Undertow host.
- */
- private Integer workerThreads;
-
- /**
- * The number of io threads to use in a Undertow host.
- */
- private Integer ioThreads;
-
- /**
- * The buffer size of the Undertow host.
- */
- private Integer bufferSize;
-
- /**
- * Set if the Undertow host should use direct buffers.
- */
- private Boolean directBuffers;
-
- /**
- * Set if the Undertow host should use http2 protocol.
- */
- private Boolean http2Enabled;
-
-
- public Integer getWorkerThreads() {
- return workerThreads;
- }
-
- public void setWorkerThreads(Integer workerThreads) {
- this.workerThreads = workerThreads;
- }
-
- public Integer getIoThreads() {
- return ioThreads;
- }
-
- public void setIoThreads(Integer ioThreads) {
- this.ioThreads = ioThreads;
- }
-
- public Integer getBufferSize() {
- return bufferSize;
- }
-
- public void setBufferSize(Integer bufferSize) {
- this.bufferSize = bufferSize;
- }
-
- public Boolean getDirectBuffers() {
- return directBuffers;
- }
-
- public void setDirectBuffers(Boolean directBuffers) {
- this.directBuffers = directBuffers;
- }
-
- public Boolean getHttp2Enabled() {
- return http2Enabled;
- }
-
- public void setHttp2Enabled(Boolean http2Enabled) {
- this.http2Enabled = http2Enabled;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder("UndertowHostOptions{");
- sb.append("workerThreads=").append(workerThreads);
- sb.append(", ioThreads=").append(ioThreads);
- sb.append(", bufferSize=").append(bufferSize);
- sb.append(", directBuffers=").append(directBuffers);
- sb.append(", http2Enabled=").append(http2Enabled);
- sb.append('}');
- return sb.toString();
- }
+ public interface PredicatedHandler extends Handler<HttpServerRequest> {
+ boolean canHandle(HttpServerRequest event);
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java
deleted file mode 100644
index 8dd6f34..0000000
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-
-import io.undertow.client.ClientExchange;
-import io.undertow.client.ClientRequest;
-import io.undertow.client.ClientResponse;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.Headers;
-import io.undertow.util.HttpString;
-import io.undertow.util.Methods;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.TypeConverter;
-import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.support.DefaultMessage;
-import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ObjectHelper;
-import org.apache.camel.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xnio.channels.BlockingReadableByteChannel;
-import org.xnio.channels.StreamSourceChannel;
-
-public final class KnativeHttpBinding {
- private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpBinding.class);
-
- private final HeaderFilterStrategy headerFilterStrategy;
- private final Boolean transferException;
-
- public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
- this(headerFilterStrategy, Boolean.FALSE);
- }
-
- public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy, Boolean transferException) {
- this.headerFilterStrategy = Objects.requireNonNull(headerFilterStrategy, "headerFilterStrategy");
- this.transferException = transferException;
- }
-
- public Message toCamelMessage(HttpServerExchange httpExchange, Exchange exchange) throws Exception {
- Message result = new DefaultMessage(exchange.getContext());
-
- populateCamelHeaders(httpExchange, result.getHeaders(), exchange);
-
- //extract body by myself if undertow parser didn't handle and the method is allowed to have one
- //body is extracted as byte[] then auto TypeConverter kicks in
- if (Methods.POST.equals(httpExchange.getRequestMethod()) || Methods.PUT.equals(httpExchange.getRequestMethod()) || Methods.PATCH.equals(httpExchange.getRequestMethod())) {
- result.setBody(readFromChannel(httpExchange.getRequestChannel()));
- } else {
- result.setBody(null);
- }
-
- return result;
- }
-
- public Message toCamelMessage(ClientExchange clientExchange, Exchange exchange) throws Exception {
- Message result = new DefaultMessage(exchange.getContext());
-
- //retrieve response headers
- populateCamelHeaders(clientExchange.getResponse(), result.getHeaders(), exchange);
-
- result.setBody(readFromChannel(clientExchange.getResponseChannel()));
-
- return result;
- }
-
- public void populateCamelHeaders(HttpServerExchange httpExchange, Map<String, Object> headersMap, Exchange exchange) {
- String path = httpExchange.getRequestPath();
- KnativeHttpEndpoint endpoint = (KnativeHttpEndpoint) exchange.getFromEndpoint();
- if (endpoint.getHttpURI() != null) {
- // need to match by lower case as we want to ignore case on context-path
- String endpointPath = endpoint.getHttpURI().getPath();
- String matchPath = path.toLowerCase(Locale.US);
- String match = endpointPath.toLowerCase(Locale.US);
- if (matchPath.startsWith(match)) {
- path = path.substring(endpointPath.length());
- }
- }
- headersMap.put(Exchange.HTTP_PATH, path);
-
- for (HttpString name : httpExchange.getRequestHeaders().getHeaderNames()) {
- if (name.toString().toLowerCase(Locale.US).equals("content-type")) {
- name = Headers.CONTENT_TYPE;
- }
-
- if (name.toString().toLowerCase(Locale.US).equals("authorization")) {
- String value = httpExchange.getRequestHeaders().get(name).toString();
- // store a special header that this request was authenticated using HTTP Basic
- if (value != null && value.trim().startsWith("Basic")) {
- if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) {
- appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic");
- }
- }
- }
-
- // add the headers one by one, and use the header filter strategy
- for (Object value : httpExchange.getRequestHeaders().get(name)) {
- if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) {
- appendHeader(headersMap, name.toString(), value);
- }
- }
- }
-
- //process uri parameters as headers
- Map<String, Deque<String>> pathParameters = httpExchange.getQueryParameters();
- for (Map.Entry<String, Deque<String>> entry : pathParameters.entrySet()) {
- String name = entry.getKey();
- for (Object value: entry.getValue()) {
- if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
- appendHeader(headersMap, name, value);
- }
- }
- }
-
- headersMap.put(Exchange.HTTP_METHOD, httpExchange.getRequestMethod().toString());
- headersMap.put(Exchange.HTTP_URL, httpExchange.getRequestURL());
- headersMap.put(Exchange.HTTP_URI, httpExchange.getRequestURI());
- headersMap.put(Exchange.HTTP_QUERY, httpExchange.getQueryString());
- headersMap.put(Exchange.HTTP_RAW_QUERY, httpExchange.getQueryString());
- }
-
- public void populateCamelHeaders(ClientResponse response, Map<String, Object> headersMap, Exchange exchange) throws Exception {
- headersMap.put(Exchange.HTTP_RESPONSE_CODE, response.getResponseCode());
-
- for (HttpString name : response.getResponseHeaders().getHeaderNames()) {
- // mapping the content-type
- //String name = httpName.toString();
- if (name.toString().toLowerCase(Locale.US).equals("content-type")) {
- name = Headers.CONTENT_TYPE;
- }
-
- if (name.toString().toLowerCase(Locale.US).equals("authorization")) {
- String value = response.getResponseHeaders().get(name).toString();
- // store a special header that this request was authenticated using HTTP Basic
- if (value != null && value.trim().startsWith("Basic")) {
- if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) {
- appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic");
- }
- }
- }
-
- // add the headers one by one, and use the header filter strategy
- for (Object value : response.getResponseHeaders().get(name)) {
- if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) {
- appendHeader(headersMap, name.toString(), value);
- }
- }
- }
- }
-
- public Object toHttpResponse(HttpServerExchange httpExchange, Message message) throws IOException {
- final boolean failed = message.getExchange().isFailed();
- final int defaultCode = failed ? 500 : 200;
- final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);
- final TypeConverter tc = message.getExchange().getContext().getTypeConverter();
-
- httpExchange.setStatusCode(code);
-
- //copy headers from Message to Response
- for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
- String key = entry.getKey();
- Object value = entry.getValue();
- // use an iterator as there can be multiple values. (must not use a delimiter)
- for (Object it: ObjectHelper.createIterable(value, null)) {
- String headerValue = tc.convertTo(String.class, it);
- if (headerValue == null) {
- continue;
- }
- if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
- httpExchange.getResponseHeaders().add(new HttpString(key), headerValue);
- }
- }
- }
-
- Object body = message.getBody();
- Exception exception = message.getExchange().getException();
-
- if (exception != null) {
- if (transferException) {
- // we failed due an exception, and transfer it as java serialized object
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(exception);
- oos.flush();
- IOHelper.close(oos, bos);
-
- // the body should be the serialized java object of the exception
- body = ByteBuffer.wrap(bos.toByteArray());
- // force content type to be serialized java object
- message.setHeader(Exchange.CONTENT_TYPE, "application/x-java-serialized-object");
- } else {
- // we failed due an exception so print it as plain text
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- exception.printStackTrace(pw);
-
- // the body should then be the stacktrace
- body = ByteBuffer.wrap(sw.toString().getBytes());
- // force content type to be text/plain as that is what the stacktrace is
- message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
- }
-
- // and mark the exception as failure handled, as we handled it by returning it as the response
- ExchangeHelper.setFailureHandled(message.getExchange());
- }
-
- // set the content type in the response.
- String contentType = MessageHelper.getContentType(message);
- if (contentType != null) {
- // set content-type
- httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType);
- LOG.trace("Content-Type: {}", contentType);
- }
- return body;
- }
-
- public Object toHttpRequest(ClientRequest clientRequest, Message message) {
- final Object body = message.getBody();
- final HeaderMap requestHeaders = clientRequest.getRequestHeaders();
-
- // set the content type in the response.
- String contentType = MessageHelper.getContentType(message);
- if (contentType != null) {
- // set content-type
- requestHeaders.put(Headers.CONTENT_TYPE, contentType);
- }
-
- TypeConverter tc = message.getExchange().getContext().getTypeConverter();
-
- //copy headers from Message to Request
- for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
- String key = entry.getKey();
- Object value = entry.getValue();
- // use an iterator as there can be multiple values. (must not use a delimiter)
- for (Object it: ObjectHelper.createIterable(value, null)) {
- String headerValue = tc.convertTo(String.class, it);
- if (headerValue == null) {
- continue;
- }
- if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
- requestHeaders.add(new HttpString(key), headerValue);
- }
- }
- }
-
- return body;
- }
-
- @SuppressWarnings("unchecked")
- public static void appendHeader(Map<String, Object> headers, String key, Object value) {
- if (headers.containsKey(key)) {
- Object existing = headers.get(key);
- List<Object> list;
- if (existing instanceof List) {
- list = (List<Object>) existing;
- } else {
- list = new ArrayList<>();
- list.add(existing);
- }
- list.add(value);
- value = list;
- }
-
- headers.put(key, value);
- }
-
- public static byte[] readFromChannel(StreamSourceChannel source) throws IOException {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
- final ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
- final ReadableByteChannel blockingSource = new BlockingReadableByteChannel(source);
-
- for (;;) {
- int res = blockingSource.read(buffer);
- if (res == -1) {
- return out.toByteArray();
- } else if (res == 0) {
- LOG.error("Channel did not block");
- } else {
- buffer.flip();
- out.write(
- buffer.array(),
- buffer.arrayOffset() + buffer.position(),
- buffer.arrayOffset() + buffer.limit());
- buffer.clear();
- }
- }
- }
-}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java
deleted file mode 100644
index 4adf801..0000000
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.util.Map;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import io.undertow.client.ClientCallback;
-import io.undertow.client.ClientConnection;
-import io.undertow.client.ClientExchange;
-import io.undertow.client.ClientRequest;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.http.common.HttpHelper;
-import org.apache.camel.http.common.HttpOperationFailedException;
-import org.apache.camel.support.ExchangeHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xnio.ChannelExceptionHandler;
-import org.xnio.ChannelListener;
-import org.xnio.ChannelListeners;
-import org.xnio.IoUtils;
-import org.xnio.channels.StreamSinkChannel;
-
-
-class KnativeHttpClientCallback implements ClientCallback<ClientConnection> {
- private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpClientCallback.class);
-
- private final ByteBuffer body;
- private final AsyncCallback callback;
- private final BlockingDeque<Closeable> closeables;
- private final KnativeHttpEndpoint endpoint;
- private final Exchange exchange;
- private final ClientRequest request;
-
- KnativeHttpClientCallback(Exchange exchange, AsyncCallback callback, KnativeHttpEndpoint endpoint, ClientRequest request, ByteBuffer body) {
- this.closeables = new LinkedBlockingDeque<>();
- this.exchange = exchange;
- this.callback = callback;
- this.endpoint = endpoint;
- this.request = request;
- this.body = body;
- }
-
- @Override
- public void completed(final ClientConnection connection) {
- // we have established connection, make sure we close it
- deferClose(connection);
-
- // now we can send the request and perform the exchange: writing the
- // request and reading the response
- connection.sendRequest(request, on(this::performClientExchange));
- }
-
- @Override
- public void failed(final IOException e) {
- hasFailedWith(e);
- }
-
- private ChannelListener<StreamSinkChannel> asyncWriter(final ByteBuffer body) {
- return channel -> {
- try {
- write(channel, body);
-
- if (body.hasRemaining()) {
- channel.resumeWrites();
- } else {
- flush(channel);
- }
- } catch (final IOException e) {
- hasFailedWith(e);
- }
- };
- }
-
- private void deferClose(final Closeable closeable) {
- try {
- closeables.putFirst(closeable);
- } catch (final InterruptedException e) {
- hasFailedWith(e);
- }
- }
-
- private void finish(final Message result) {
- for (final Closeable closeable : closeables) {
- IoUtils.safeClose(closeable);
- }
-
- if (result != null) {
- if (ExchangeHelper.isOutCapable(exchange)) {
- exchange.setOut(result);
- } else {
- exchange.setIn(result);
- }
- }
-
- callback.done(false);
- }
-
- private void hasFailedWith(final Throwable e) {
- LOG.trace("Exchange has failed with", e);
- if (Boolean.TRUE.equals(endpoint.getThrowExceptionOnFailure())) {
- exchange.setException(e);
- }
-
- finish(null);
- }
-
- private <T> ClientCallback<T> on(final Consumer<T> completed) {
- return on(completed, this::hasFailedWith);
- }
-
- private <T> ClientCallback<T> on(Consumer<T> completed, Consumer<IOException> failed) {
- return new ClientCallback<T>() {
- @Override
- public void completed(final T result) {
- completed.accept(result);
- }
-
- @Override
- public void failed(final IOException e) {
- failed(e);
- }
- };
- }
-
- private void performClientExchange(final ClientExchange clientExchange) {
- // add response listener to the exchange, we could receive the response
- // at any time (async)
- setupResponseListener(clientExchange);
-
- // write the request
- writeRequest(clientExchange, body);
- }
-
- private void setupResponseListener(final ClientExchange clientExchange) {
- clientExchange.setResponseListener(on(response -> {
- try {
- final KnativeHttpBinding binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
- final Message result = binding.toCamelMessage(clientExchange, exchange);
- final int code = clientExchange.getResponse().getResponseCode();
-
- if (!HttpHelper.isStatusCodeOk(code, "200-299") && endpoint.getThrowExceptionOnFailure()) {
- // operation failed so populate exception to throw
- final String uri = endpoint.getHttpURI().toString();
- final String statusText = clientExchange.getResponse().getStatus();
-
- // Convert Message headers (Map<String, Object>) to Map<String, String> as expected by
- // HttpOperationsFailedException using Message versus clientExchange as its header values
- // have extra formatting
- final Map<String, String> headers = result.getHeaders().entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().toString()));
-
- // Since result (Message) isn't associated with an Exchange yet, you can not use result.getBody(String.class)
- final String bodyText = ExchangeHelper.convertToType(exchange, String.class, result.getBody());
- final Exception cause = new HttpOperationFailedException(uri, code, statusText, null, headers, bodyText);
-
- if (ExchangeHelper.isOutCapable(exchange)) {
- exchange.setOut(result);
- } else {
- exchange.setIn(result);
- }
-
- // make sure to fail with HttpOperationFailedException
- hasFailedWith(cause);
- } else {
- // we end Camel exchange here
- finish(result);
- }
- } catch (Throwable e) {
- hasFailedWith(e);
- }
- }));
- }
-
- private void writeRequest(final ClientExchange clientExchange, final ByteBuffer body) {
- final StreamSinkChannel requestChannel = clientExchange.getRequestChannel();
- if (body != null) {
- try {
- // try writing, we could be on IO thread and ready to write to
- // the socket (or not)
- write(requestChannel, body);
-
- if (body.hasRemaining()) {
- // we did not write all of body (or at all) register a write
- // listener to write asynchronously
- requestChannel.getWriteSetter().set(asyncWriter(body));
- requestChannel.resumeWrites();
- } else {
- // we are done, we need to flush the request
- flush(requestChannel);
- }
- } catch (final IOException e) {
- hasFailedWith(e);
- }
- }
- }
-
- private static void flush(final StreamSinkChannel channel) throws IOException {
- // the canonical way of flushing Xnio channels
- channel.shutdownWrites();
- if (!channel.flush()) {
- final ChannelListener<StreamSinkChannel> safeClose = IoUtils::safeClose;
- final ChannelExceptionHandler<Channel> closingChannelExceptionHandler = ChannelListeners
- .closingChannelExceptionHandler();
- final ChannelListener<StreamSinkChannel> flushingChannelListener = ChannelListeners
- .flushingChannelListener(safeClose, closingChannelExceptionHandler);
- channel.getWriteSetter().set(flushingChannelListener);
- channel.resumeWrites();
- }
- }
-
- private static void write(final StreamSinkChannel channel, final ByteBuffer body) throws IOException {
- int written = 1;
- while (body.hasRemaining() && written > 0) {
- written = channel.write(body);
- }
- }
-}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
index bea70a5..24edb27 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
@@ -17,94 +17,188 @@
package org.apache.camel.component.knative.http;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import io.undertow.predicate.Predicate;
-import io.undertow.server.HttpHandler;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.client.WebClientOptions;
import org.apache.camel.Endpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.IntrospectionSupport;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component("knative-http")
public class KnativeHttpComponent extends DefaultComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class);
- private final Map<KnativeHttp.HostKey, KnativeHttpDispatcher> registry;
+ private final Map<KnativeHttp.ServerKey, KnativeHttpConsumerDispatcher> registry;
+
+ @Metadata(label = "advanced")
+ private Vertx vertx;
+ @Metadata(label = "advanced")
+ private VertxOptions vertxOptions;
@Metadata(label = "advanced")
- private KnativeHttp.HostOptions hostOptions;
+ private HttpServerOptions vertxHttpServerOptions;
+ @Metadata(label = "advanced")
+ private WebClientOptions vertxHttpClientOptions;
+
+ private boolean localVertx;
+ private ExecutorService executor;
public KnativeHttpComponent() {
this.registry = new ConcurrentHashMap<>();
+ this.localVertx = false;
}
@Override
- protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- final Pattern pattern = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)");
- final Matcher matcher = pattern.matcher(remaining);
+ protected void doInit() throws Exception {
+ super.doInit();
+
+ this.executor = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "knative-http-component");
+
+ if (this.vertx == null) {
+ Set<Vertx> instances = getCamelContext().getRegistry().findByType(Vertx.class);
+ if (instances.size() == 1) {
+ this.vertx = instances.iterator().next();
+ }
+ }
+ if (this.vertx == null) {
+ VertxOptions options = ObjectHelper.supplyIfEmpty(this.vertxOptions, VertxOptions::new);
+
+ this.vertx = Vertx.vertx(options);
+ this.localVertx = true;
+ }
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+
+ if (this.vertx != null && this.localVertx) {
+ Future<?> future = this.executor.submit(
+ () -> {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ this.vertx.close(result -> {
+ try {
+ if (result.failed()) {
+ LOGGER.warn("Failed to close Vert.x HttpServer reason: {}",
+ result.cause().getMessage()
+ );
+
+ throw new RuntimeException(result.cause());
+ }
+
+ LOGGER.info("Vert.x HttpServer stopped");
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ try {
+ future.get();
+ } finally {
+ this.vertx = null;
+ this.localVertx = false;
+ }
+ }
+
+ if (this.executor != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
+ }
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ Matcher matcher = KnativeHttp.ENDPOINT_PATTERN.matcher(remaining);
if (!matcher.find()) {
throw new IllegalArgumentException("Bad URI: " + remaining);
}
- final String host;
- final int port;
- final String path;
+ KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this);
+ ep.setHeaderFilter(IntrospectionSupport.extractProperties(parameters, "filter.", true));
switch (matcher.groupCount()) {
case 1:
- host = matcher.group(1);
- port = 8080;
- path = "/";
+ ep.setHost(matcher.group(1));
+ ep.setPort(KnativeHttp.DEFAULT_PORT);
+ ep.setPath(KnativeHttp.DEFAULT_PATH);
break;
case 2:
- host = matcher.group(1);
- port = Integer.parseInt(matcher.group(2));
- path = "/";
+ ep.setHost(matcher.group(1));
+ ep.setPort(Integer.parseInt(matcher.group(2)));
+ ep.setPath(KnativeHttp.DEFAULT_PATH);
break;
case 3:
- host = matcher.group(1);
- port = Integer.parseInt(matcher.group(2));
- path = "/" + matcher.group(3);
+ ep.setHost(matcher.group(1));
+ ep.setPort(Integer.parseInt(matcher.group(2)));
+ ep.setPath(KnativeHttp.DEFAULT_PATH + matcher.group(3));
break;
default:
throw new IllegalArgumentException("Bad URI: " + remaining);
}
- KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this);
- ep.setHost(host);
- ep.setPort(port);
- ep.setPath(path);
- ep.setHeaderFilter(IntrospectionSupport.extractProperties(parameters, "filter.", true));
-
setProperties(ep, parameters);
return ep;
}
- public KnativeHttp.HostOptions getHostOptions() {
- return hostOptions;
+ public Vertx getVertx() {
+ return vertx;
}
- public void setHostOptions(KnativeHttp.HostOptions hostOptions) {
- this.hostOptions = hostOptions;
+ public void setVertx(Vertx vertx) {
+ this.vertx = vertx;
}
- public void bind(KnativeHttp.HostKey key, HttpHandler handler, Predicate predicate) {
- getUndertow(key).bind(handler, predicate);
+ public VertxOptions getVertxOptions() {
+ return vertxOptions;
}
- public void unbind(KnativeHttp.HostKey key, HttpHandler handler) {
- getUndertow(key).unbind(handler);
+ public void setVertxOptions(VertxOptions vertxOptions) {
+ this.vertxOptions = vertxOptions;
+ }
+
+ public HttpServerOptions getVertxHttpServerOptions() {
+ return vertxHttpServerOptions;
+ }
+
+ public void setVertxHttpServerOptions(HttpServerOptions vertxHttpServerOptions) {
+ this.vertxHttpServerOptions = vertxHttpServerOptions;
+ }
+
+ public WebClientOptions getVertxHttpClientOptions() {
+ return vertxHttpClientOptions;
+ }
+
+ public void setVertxHttpClientOptions(WebClientOptions vertxHttpClientOptions) {
+ this.vertxHttpClientOptions = vertxHttpClientOptions;
+ }
+ KnativeHttpConsumerDispatcher getDispatcher(KnativeHttp.ServerKey key) {
+ return registry.computeIfAbsent(key, k -> new KnativeHttpConsumerDispatcher(executor, vertx, k, vertxHttpServerOptions));
}
- private KnativeHttpDispatcher getUndertow(KnativeHttp.HostKey key) {
- return registry.computeIfAbsent(key, k -> new KnativeHttpDispatcher(k, hostOptions));
+ ExecutorService getExecutorService() {
+ return this.executor;
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index 26f057b..a9bbedf 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -16,31 +16,57 @@
*/
package org.apache.camel.component.knative.http;
-import java.nio.ByteBuffer;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Predicate;
-import io.undertow.predicate.Predicates;
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.HttpString;
-import io.undertow.util.MimeMappings;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.MessageHelper;
import org.apache.camel.util.ObjectHelper;
-public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler {
- private final KnativeHttpBinding binding;
+public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.PredicatedHandler {
+ private final Predicate<HttpServerRequest> filter;
public KnativeHttpConsumer(KnativeHttpEndpoint endpoint, Processor processor) {
super(endpoint, processor);
- this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
+ filter = v -> {
+ if (!Objects.equals(endpoint.getPath(), v.path())) {
+ return false;
+ }
+ if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) {
+ return true;
+ }
+
+ for (Map.Entry<String, Object> entry : endpoint.getHeaderFilter().entrySet()) {
+ String ref = entry.getValue().toString();
+ String val = v.getHeader(entry.getKey());
+ boolean matches = Objects.equals(ref, val) || val.matches(ref);
+
+ if (!matches) {
+ return false;
+ }
+ }
+
+ return true;
+ };
}
@Override
@@ -52,30 +78,9 @@ public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler
protected void doStart() throws Exception {
final KnativeHttpEndpoint endpoint = getEndpoint();
final KnativeHttpComponent component = endpoint.getComponent();
- final KnativeHttp.HostKey key = endpoint.getHostKey();
-
- component.bind(key, this, Predicates.and(
- Predicates.path(endpoint.getPath()),
- value -> {
- if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) {
- return true;
- }
-
- HeaderMap hm = value.getRequestHeaders();
+ final KnativeHttp.ServerKey key = endpoint.getServerKey();
- for (Map.Entry<String, Object> entry: endpoint.getHeaderFilter().entrySet()) {
- String ref = entry.getValue().toString();
- String val = hm.getFirst(entry.getKey());
- boolean matches = Objects.equals(ref, val) || val.matches(ref);
-
- if (!matches) {
- return false;
- }
- }
-
- return true;
- }
- ));
+ component.getDispatcher(key).bind(this);
super.doStart();
}
@@ -84,49 +89,152 @@ public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler
protected void doStop() throws Exception {
final KnativeHttpEndpoint endpoint = getEndpoint();
final KnativeHttpComponent component = endpoint.getComponent();
- final KnativeHttp.HostKey key = endpoint.getHostKey();
+ final KnativeHttp.ServerKey key = endpoint.getServerKey();
- component.unbind(key, this);
+ component.getDispatcher(key).unbind(this);
super.doStop();
}
@Override
- public void handleRequest(HttpServerExchange httpExchange) throws Exception {
- //create new Exchange
- //binding is used to extract header and payload(if available)
- Exchange camelExchange = createExchange(httpExchange);
-
- //Unit of Work to process the Exchange
- createUoW(camelExchange);
- try {
- getProcessor().process(camelExchange);
- } catch (Exception e) {
- getExceptionHandler().handleException(e);
- } finally {
- doneUoW(camelExchange);
+ public boolean canHandle(HttpServerRequest request) {
+ return filter.test(request);
+ }
+
+ @Override
+ public void handle(HttpServerRequest request) {
+ if (request.method() == HttpMethod.POST) {
+ final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
+ final Message in = toMessage(request, exchange);
+
+ request.bodyHandler(buffer -> {
+ in.setBody(buffer.getBytes());
+
+ exchange.setIn(in);
+
+ try {
+ createUoW(exchange);
+ getAsyncProcessor().process(exchange, doneSync -> {
+ try {
+ HttpServerResponse response = toHttpResponse(request, exchange.getMessage());
+ Buffer body = computeResponseBody(exchange.getMessage());
+
+ // set the content type in the response.
+ String contentType = MessageHelper.getContentType(exchange.getMessage());
+ if (contentType != null) {
+ // set content-type
+ response.putHeader(Exchange.CONTENT_TYPE, contentType);
+ }
+
+ if (body == null) {
+ request.response().setStatusCode(204);
+ request.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
+ request.response().end("No response available");
+ } else {
+ request.response().end(body);
+ }
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ }
+ });
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ } finally {
+ doneUoW(exchange);
+ }
+ });
+ } else {
+ request.response().setStatusCode(405);
+ request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain");
+ request.response().end("Unsupported method");
+
+ throw new IllegalArgumentException("Unsupported method: " + request.method());
+ }
+ }
+
+ private Message toMessage(HttpServerRequest request, Exchange exchange) {
+ KnativeHttpEndpoint endpoint = getEndpoint();
+ Message message = new DefaultMessage(exchange.getContext());
+ String path = request.path();
+
+ if (endpoint.getPath() != null) {
+ String endpointPath = endpoint.getPath();
+ String matchPath = path.toLowerCase(Locale.US);
+ String match = endpointPath.toLowerCase(Locale.US);
+
+ if (matchPath.startsWith(match)) {
+ path = path.substring(endpointPath.length());
+ }
+ }
+
+ for (Map.Entry<String, String> entry : request.headers().entries()) {
+ if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
+ }
+ }
+ for (Map.Entry<String, String> entry : request.params().entries()) {
+ if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
+ }
}
- Object body = binding.toHttpResponse(httpExchange, camelExchange.getMessage());
- TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter();
+ message.setHeader(Exchange.HTTP_PATH, path);
+ message.setHeader(Exchange.HTTP_METHOD, request.method());
+ message.setHeader(Exchange.HTTP_URI, request.uri());
+ message.setHeader(Exchange.HTTP_QUERY, request.query());
- if (body == null) {
- httpExchange.getResponseHeaders().put(new HttpString(Exchange.CONTENT_TYPE), MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt"));
- httpExchange.getResponseSender().send("No response available");
- } else {
- ByteBuffer bodyAsByteBuffer = tc.mandatoryConvertTo(ByteBuffer.class, body);
- httpExchange.getResponseSender().send(bodyAsByteBuffer);
+ return message;
+ }
+
+ private HttpServerResponse toHttpResponse(HttpServerRequest request, Message message) {
+ final HttpServerResponse response = request.response();
+ final boolean failed = message.getExchange().isFailed();
+ final int defaultCode = failed ? 500 : 200;
+ final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);
+ final TypeConverter tc = message.getExchange().getContext().getTypeConverter();
+
+ response.setStatusCode(code);
+
+ for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+ final String key = entry.getKey();
+ final Object value = entry.getValue();
+
+ for (Object it: org.apache.camel.support.ObjectHelper.createIterable(value, null)) {
+ String headerValue = tc.convertTo(String.class, it);
+ if (headerValue == null) {
+ continue;
+ }
+ if (!getEndpoint().getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
+ response.putHeader(key, headerValue);
+ }
+ }
}
+
+ return response;
}
- public Exchange createExchange(HttpServerExchange httpExchange) throws Exception {
- Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
- Message in = binding.toCamelMessage(httpExchange, exchange);
+ private Buffer computeResponseBody(Message message) throws NoTypeConversionAvailableException {
+ Object body = message.getBody();
+ Exception exception = message.getExchange().getException();
+
+ if (exception != null) {
+ // we failed due an exception so print it as plain text
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ exception.printStackTrace(pw);
- exchange.setProperty(Exchange.CHARSET_NAME, httpExchange.getRequestCharset());
- in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, httpExchange.getRequestCharset());
+ // the body should then be the stacktrace
+ body = sw.toString().getBytes(StandardCharsets.UTF_8);
+ // force content type to be text/plain as that is what the stacktrace is
+ message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
+
+ // and mark the exception as failure handled, as we handled it by returning
+ // it as the response
+ ExchangeHelper.setFailureHandled(message.getExchange());
+ }
- exchange.setIn(in);
- return exchange;
+ return Buffer.buffer(
+ message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body)
+ );
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java
new file mode 100644
index 0000000..446dc6b
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ReferenceCount;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class KnativeHttpConsumerDispatcher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumerDispatcher.class);
+
+ private final Vertx vertx;
+ private final KnativeHttp.ServerKey key;
+ private final ReferenceCount refCnt;
+ private final Set<KnativeHttp.PredicatedHandler> handlers;
+ private final HttpServerWrapper server;
+ private final HttpServerOptions serverOptions;
+ private final ExecutorService executor;
+
+ public KnativeHttpConsumerDispatcher(ExecutorService executor, Vertx vertx, KnativeHttp.ServerKey key, HttpServerOptions serverOptions) {
+ this.executor = executor;
+ this.vertx = vertx;
+ this.serverOptions = ObjectHelper.supplyIfEmpty(serverOptions, HttpServerOptions::new);
+ this.server = new HttpServerWrapper();
+
+ this.handlers = new CopyOnWriteArraySet<>();
+ this.key = key;
+ this.refCnt = ReferenceCount.on(server::start, server::stop);
+ }
+
+ public void bind(KnativeHttp.PredicatedHandler handler) {
+ if (handlers.add(handler)) {
+ refCnt.retain();
+ }
+ }
+
+ public void unbind(KnativeHttp.PredicatedHandler handler) {
+ if (handlers.remove(handler)) {
+ refCnt.release();
+ }
+ }
+
+ private final class HttpServerWrapper extends ServiceSupport implements Handler<HttpServerRequest> {
+ private HttpServer server;
+
+ @Override
+ protected void doStart() throws Exception {
+ LOGGER.info("Starting Vert.x HttpServer on {}:{}}",
+ key.getHost(),
+ key.getPort()
+ );
+
+ startAsync().toCompletableFuture().join();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ LOGGER.info("Stopping Vert.x HttpServer on {}:{}",
+ key.getHost(),
+ key.getPort());
+
+ try {
+ if (server != null) {
+ stopAsync().toCompletableFuture().join();
+ }
+ } finally {
+ this.server = null;
+ }
+ }
+
+ private CompletionStage<Void> startAsync() {
+ server = vertx.createHttpServer(serverOptions);
+ server.requestHandler(this);
+
+ return CompletableFuture.runAsync(
+ () -> {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ server.listen(key.getPort(), key.getHost(), result -> {
+ try {
+ if (result.failed()) {
+ LOGGER.warn("Failed to start Vert.x HttpServer on {}:{}, reason: {}",
+ key.getHost(),
+ key.getPort(),
+ result.cause().getMessage()
+ );
+
+ throw new RuntimeException(result.cause());
+ }
+
+ LOGGER.info("Vert.x HttpServer started on {}:{}", key.getPort(), key.getHost());
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ executor
+ );
+ }
+
+ protected CompletionStage<Void> stopAsync() {
+ return CompletableFuture.runAsync(
+ () -> {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ server.close(result -> {
+ try {
+ if (result.failed()) {
+ LOGGER.warn("Failed to close Vert.x HttpServer reason: {}",
+ result.cause().getMessage()
+ );
+
+ throw new RuntimeException(result.cause());
+ }
+
+ LOGGER.info("Vert.x HttpServer stopped");
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ executor
+ );
+ }
+
+ @Override
+ public void handle(HttpServerRequest request) {
+ LOGGER.debug("received exchange on path: {}, headers: {}",
+ request.path(),
+ request.headers()
+ );
+
+ for (KnativeHttp.PredicatedHandler handler: handlers) {
+ if (handler.canHandle(request)) {
+ handler.handle(request);
+ return;
+ }
+ }
+
+ LOGGER.warn("No handler found for path: {}, headers: {}",
+ request.path(),
+ request.headers()
+ );
+
+ HttpServerResponse response = request.response();
+ response.setStatusCode(404);
+ response.putHeader(Exchange.CONTENT_TYPE, "text/plain");
+ response.end("No matching condition found");
+ }
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java
deleted file mode 100644
index 2d774cd..0000000
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import io.undertow.Undertow;
-import io.undertow.UndertowOptions;
-import io.undertow.predicate.Predicate;
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.server.handlers.PathHandler;
-import io.undertow.util.Headers;
-import io.undertow.util.StatusCodes;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ReferenceCount;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class KnativeHttpDispatcher implements HttpHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpDispatcher.class);
-
- private final KnativeHttp.HostKey key;
- private final KnativeHttp.HostOptions options;
- private final ReferenceCount refCnt;
- private final Set<PredicatedHandlerWrapper> handlers;
- private final Undertow undertow;
- private final PathHandler handler;
-
- public KnativeHttpDispatcher(KnativeHttp.HostKey key, KnativeHttp.HostOptions option) {
- this.handlers = new CopyOnWriteArraySet<>();
- this.key = key;
- this.options = option;
- this.handler = new PathHandler(this);
- this.undertow = createUndertow();
- this.refCnt = ReferenceCount.on(this::startUndertow, this::stopUndertow);
- }
-
- @Override
- public void handleRequest(HttpServerExchange exchange) throws Exception {
- LOGGER.debug("received exchange on path: {}, headers: {}",
- exchange.getRelativePath(),
- exchange.getRequestHeaders()
- );
-
- for (PredicatedHandlerWrapper handler: handlers) {
- if (handler.dispatch(exchange)) {
- return;
- }
- }
-
- exchange.setStatusCode(StatusCodes.NOT_FOUND);
- exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
- exchange.getResponseSender().send("No matching condition found");
- }
-
- public void bind(HttpHandler handler, Predicate predicate) {
- if (handlers.add(new PredicatedHandlerWrapper(handler, predicate))) {
- refCnt.retain();
- }
- }
-
- public void unbind(HttpHandler handler) {
- if (handlers.removeIf(phw -> phw.handler == handler)) {
- refCnt.release();
- }
- }
-
- private void startUndertow() {
- try {
- LOGGER.info("Starting Undertow server on {}://{}:{}}",
- key.getSslContext() != null ? "https" : "http",
- key.getHost(),
- key.getPort()
- );
-
- undertow.start();
- } catch (RuntimeException e) {
- LOGGER.warn("Failed to start Undertow server on {}://{}:{}, reason: {}",
- key.getSslContext() != null ? "https" : "http",
- key.getHost(),
- key.getPort(),
- e.getMessage()
- );
-
- undertow.stop();
-
- throw e;
- }
- }
-
- private void stopUndertow() {
- LOGGER.info("Stopping Undertow server on {}://{}:{}",
- key.getSslContext() != null ? "https" : "http",
- key.getHost(),
- key.getPort());
-
- undertow.stop();
- }
-
- private Undertow createUndertow() {
- Undertow.Builder builder = Undertow.builder();
- if (key.getSslContext() != null) {
- builder.addHttpsListener(key.getPort(), key.getHost(), key.getSslContext());
- } else {
- builder.addHttpListener(key.getPort(), key.getHost());
- }
-
- if (options != null) {
- ObjectHelper.ifNotEmpty(options.getIoThreads(), builder::setIoThreads);
- ObjectHelper.ifNotEmpty(options.getWorkerThreads(), builder::setWorkerThreads);
- ObjectHelper.ifNotEmpty(options.getBufferSize(), builder::setBufferSize);
- ObjectHelper.ifNotEmpty(options.getDirectBuffers(), builder::setDirectBuffers);
- ObjectHelper.ifNotEmpty(options.getHttp2Enabled(), e -> builder.setServerOption(UndertowOptions.ENABLE_HTTP2, e));
- }
-
- return builder.setHandler(new PathHandler(handler)).build();
- }
-
- private static final class PredicatedHandlerWrapper {
- private final HttpHandler handler;
- private final Predicate predicate;
-
- public PredicatedHandlerWrapper(HttpHandler handler, Predicate predicate) {
- this.handler = ObjectHelper.notNull(handler, "handler");
- this.predicate = ObjectHelper.notNull(predicate, "predicate");
- }
-
- boolean dispatch(HttpServerExchange exchange) throws Exception {
- if (predicate.resolve(exchange)) {
- if (exchange.isInIoThread()) {
- exchange.dispatch(handler);
- } else {
- handler.handleRequest(exchange);
- }
-
- return true;
- }
-
- LOGGER.debug("No handler for path: {}, headers: {}",
- exchange.getRelativePath(),
- exchange.getRequestHeaders()
- );
-
- return false;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PredicatedHandlerWrapper holder = (PredicatedHandlerWrapper) o;
- return handler.equals(holder.handler);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(handler);
- }
- }
-}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
index e02f736..e9d8c2b 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.knative.http;
-import java.net.URI;
import java.util.Map;
import org.apache.camel.Consumer;
@@ -27,10 +26,9 @@ import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.jsse.SSLContextParameters;
-import org.apache.camel.util.ObjectHelper;
-import org.xnio.OptionMap;
@UriEndpoint(
firstVersion = "3.0.0",
@@ -65,12 +63,6 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
}
- // **********************************
- //
- // Properties
- //
- // **********************************
-
public String getHost() {
return host;
}
@@ -93,6 +85,10 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
public void setPath(String path) {
this.path = path;
+
+ if (!this.path.startsWith("/")) {
+ this.path = "/" + path;
+ }
}
public HeaderFilterStrategy getHeaderFilterStrategy() {
@@ -127,26 +123,10 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
this.throwExceptionOnFailure = throwExceptionOnFailure;
}
- public KnativeHttp.HostKey getHostKey() {
- return new KnativeHttp.HostKey(host, port, null);
+ public KnativeHttp.ServerKey getServerKey() {
+ return new KnativeHttp.ServerKey(host, port);
}
- public URI getHttpURI() {
- String uri = "http://" + host + ":" + port;
-
- if (ObjectHelper.isNotEmpty(path)) {
- uri += path;
- }
-
- return URI.create(uri);
- }
-
- // **********************************
- //
- // Impl
- //
- // **********************************
-
@Override
public KnativeHttpComponent getComponent() {
return (KnativeHttpComponent)super.getComponent();
@@ -154,11 +134,11 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
@Override
public Producer createProducer() throws Exception {
- return new KnativeHttpProducer(this, OptionMap.EMPTY);
+ return new KnativeHttpProducer(this, getComponent().getVertx(), getComponent().getVertxHttpClientOptions());
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new KnativeHttpConsumer(this, processor);
+ return new KnativeHttpConsumer(this, AsyncProcessorConverterHelper.convert(processor));
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
index 96051c9..db3d165 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
@@ -19,12 +19,11 @@ package org.apache.camel.component.knative.http;
import org.apache.camel.support.DefaultHeaderFilterStrategy;
public class KnativeHttpHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
-
public KnativeHttpHeaderFilterStrategy() {
initialize();
}
- protected void initialize() {
+ protected final void initialize() {
getOutFilter().add("content-length");
getOutFilter().add("content-type");
getOutFilter().add("host");
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
index 7d79ccd..c4aafed 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
@@ -16,44 +16,40 @@
*/
package org.apache.camel.component.knative.http;
-import java.net.URI;
-import java.nio.ByteBuffer;
-
-import io.undertow.client.ClientRequest;
-import io.undertow.client.UndertowClient;
-import io.undertow.protocols.ssl.UndertowXnioSsl;
-import io.undertow.server.DefaultByteBufferPool;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.Headers;
-import io.undertow.util.Methods;
+import java.util.Map;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
-import org.apache.camel.TypeConverter;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.http.common.HttpOperationFailedException;
import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.jsse.SSLContextParameters;
-import org.apache.camel.util.URISupport;
+import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.xnio.OptionMap;
-import org.xnio.Xnio;
-import org.xnio.XnioWorker;
-import org.xnio.ssl.XnioSsl;
public class KnativeHttpProducer extends DefaultAsyncProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class);
- private final OptionMap options;
- private final KnativeHttpBinding binding;
+ private final Vertx vertx;
+ private final WebClientOptions clientOptions;
+ private WebClient client;
- private UndertowClient client;
- private DefaultByteBufferPool pool;
- private XnioSsl ssl;
- private XnioWorker worker;
-
- public KnativeHttpProducer(KnativeHttpEndpoint endpoint, OptionMap options) {
+ public KnativeHttpProducer(KnativeHttpEndpoint endpoint, Vertx vertx, WebClientOptions clientOptions) {
super(endpoint);
- this.options = options;
- this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
+
+ this.vertx = ObjectHelper.notNull(vertx, "vertx");
+ this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new);
}
@Override
@@ -62,70 +58,98 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
}
@Override
- public boolean process(final Exchange camelExchange, final AsyncCallback callback) {
- final KnativeHttpEndpoint endpoint = getEndpoint();
- final URI uri = endpoint.getHttpURI();
- final String pathAndQuery = URISupport.pathAndQueryOf(uri);
-
- final ClientRequest request = new ClientRequest();
- request.setMethod(Methods.POST);
- request.setPath(pathAndQuery);
- request.getRequestHeaders().put(Headers.HOST, uri.getHost());
-
- final Object body = binding.toHttpRequest(request, camelExchange.getIn());
- final TypeConverter tc = endpoint.getCamelContext().getTypeConverter();
- final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body);
-
- // As tryConvertTo is used to convert the body, we should do null check
- // or the call bodyAsByte.remaining() may throw an NPE
- if (body != null && bodyAsByte != null) {
- request.getRequestHeaders().put(Headers.CONTENT_LENGTH, bodyAsByte.remaining());
- }
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ final byte[] payload;
- // when connect succeeds or fails UndertowClientCallback will
- // get notified on a I/O thread run by Xnio worker. The writing
- // of request and reading of response is performed also in the
- // callback
- client.connect(
- new KnativeHttpClientCallback(camelExchange, callback, getEndpoint(), request, bodyAsByte),
- uri,
- worker,
- ssl,
- pool,
- options);
-
- // the call above will proceed on Xnio I/O thread we will
- // notify the exchange asynchronously when the HTTP exchange
- // ends with success or failure from UndertowClientCallback
- return false;
- }
+ try {
+ payload = exchange.getMessage().getMandatoryBody(byte[].class);
+ } catch (InvalidPayloadException e) {
+ exchange.setException(e);
+ callback.done(true);
- @Override
- protected void doStart() throws Exception {
- super.doStart();
+ return true;
+ }
- final Xnio xnio = Xnio.getInstance();
+ KnativeHttpEndpoint endpoint = getEndpoint();
+ Message message = exchange.getMessage();
- pool = new DefaultByteBufferPool(true, 17 * 1024);
- worker = xnio.createWorker(options);
+ MultiMap headers = MultiMap.caseInsensitiveMultiMap();
+ headers.add(HttpHeaders.HOST, endpoint.getHost());
+ headers.add(HttpHeaders.CONTENT_LENGTH, Integer.toString(payload.length));
- SSLContextParameters sslContext = getEndpoint().getSslContextParameters();
- if (sslContext != null) {
- ssl = new UndertowXnioSsl(xnio, options, sslContext.createSSLContext(getEndpoint().getCamelContext()));
+ String contentType = MessageHelper.getContentType(message);
+ if (contentType != null) {
+ headers.add(HttpHeaders.CONTENT_TYPE, contentType);
}
- client = UndertowClient.getInstance();
+ for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+ if (!endpoint.getHeaderFilterStrategy().applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ headers.add(entry.getKey(), entry.getValue().toString());
+ }
+ }
+
+ client.post(endpoint.getPort(), endpoint.getHost(), endpoint.getPath())
+ .putHeaders(headers)
+ .sendBuffer(Buffer.buffer(payload), response -> {
+ HttpResponse<Buffer> result = response.result();
+
+ Message answer = new DefaultMessage(exchange.getContext());
+ answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
+
+ for (Map.Entry<String, String> entry : result.headers().entries()) {
+ if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
+ }
+ }
+
+ exchange.setMessage(answer);
+
+ if (response.failed() && endpoint.getThrowExceptionOnFailure()) {
+ Exception cause = new HttpOperationFailedException(
+ getURI(),
+ result.statusCode(),
+ result.statusMessage(),
+ null,
+ KnativeHttpSupport.asStringMap(answer.getHeaders()),
+ ExchangeHelper.convertToType(exchange, String.class, answer.getBody())
+ );
- LOGGER.debug("Created worker: {} with options: {}", worker, options);
+ exchange.setException(cause);
+ }
+
+ callback.done(false);
+ });
+
+ return false;
+ }
+
+ @Override
+ protected void doInit() throws Exception {
+ super.doInit();
+
+ this.client = WebClient.create(vertx, clientOptions);
}
@Override
protected void doStop() throws Exception {
super.doStop();
- if (worker != null && !worker.isShutdown()) {
- LOGGER.debug("Shutting down worker: {}", worker);
- worker.shutdown();
+ if (this.client != null) {
+ LOGGER.debug("Shutting down client: {}", client);
+ this.client.close();
+ this.client = null;
+ }
+ }
+
+ private String getURI() {
+ String p = getEndpoint().getPath();
+
+ if (p == null) {
+ p = KnativeHttp.DEFAULT_PATH;
+ } else if (!p.startsWith("/")) {
+ p = "/" + p;
}
+
+ return String.format("http://%s:%d%s", getEndpoint().getHost(), getEndpoint().getPort(), p);
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
new file mode 100644
index 0000000..a858f75
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class KnativeHttpSupport {
+ private KnativeHttpSupport() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void appendHeader(Map<String, Object> headers, String key, Object value) {
+ if (headers.containsKey(key)) {
+ Object existing = headers.get(key);
+ List<Object> list;
+ if (existing instanceof List) {
+ list = (List<Object>) existing;
+ } else {
+ list = new ArrayList<>();
+ list.add(existing);
+ }
+ list.add(value);
+ value = list;
+ }
+
+ headers.put(key, value);
+ }
+
+ public static Map<String, String> asStringMap(Map<String, Object> map) {
+ return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ }
+}
diff --git a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index e0d1ab6..0b40284 100644
--- a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.knative.http;
import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExecutionException;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -29,7 +28,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class KnativeHttpTest {
@@ -51,7 +49,7 @@ public class KnativeHttpTest {
}
@AfterEach
- public void after() throws Exception {
+ public void after() {
if (this.context != null) {
this.context.stop();
}
@@ -65,150 +63,164 @@ public class KnativeHttpTest {
@Test
void testWithPaths() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- fromF("knative-http:0.0.0.0:%d/a/1", port)
- .routeId("r1")
- .setBody().simple("${routeId}")
- .convertBodyTo(String.class)
- .to("mock:r1");
- fromF("knative-http:0.0.0.0:%d/a/2", port)
- .routeId("r2")
- .setBody().simple("${routeId}")
- .convertBodyTo(String.class)
- .to("mock:r2");
- from("direct:start")
- .toD("undertow:http://localhost:" + port + "/a/${body}");
- }
- }
- );
-
- MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class);
- m1.expectedMessageCount(1);
- m1.expectedBodiesReceived("r1");
-
- MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class);
- m2.expectedMessageCount(1);
- m2.expectedBodiesReceived("r2");
-
+ RouteBuilder.addRoutes(context, b -> {
+ b.fromF("knative-http:0.0.0.0:%d/a/1", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}")
+ .to("mock:r1");
+ b.fromF("knative-http:0.0.0.0:%d/a/2", port)
+ .routeId("r2")
+ .setBody().simple("${routeId}")
+ .to("mock:r2");
+
+ b.from("direct:start")
+ .toD("undertow:http://localhost:" + port + "/a/${body}");
+ });
+
+ context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1);
+ context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1);
context.start();
- assertThat(
- template.requestBody("direct:start", "1", String.class)
- ).isEqualTo("r1");
- assertThat(
- template.requestBody("direct:start", "2", String.class)
- ).isEqualTo("r2");
+ assertThat(template.requestBody("direct:start", "1", String.class)).isEqualTo("r1");
+ assertThat(template.requestBody("direct:start", "2", String.class)).isEqualTo("r2");
- m1.assertIsSatisfied();
- m2.assertIsSatisfied();
+ MockEndpoint.assertIsSatisfied(context);
}
@Test
void testWithFilters() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port)
- .routeId("r1")
- .setBody().simple("${routeId}")
- .convertBodyTo(String.class)
- .to("mock:r1");
- fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port)
- .routeId("r2")
- .setBody().simple("${routeId}")
- .convertBodyTo(String.class)
- .to("mock:r2");
- fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port)
- .routeId("r3")
- .setBody().simple("${routeId}")
- .convertBodyTo(String.class)
- .to("mock:r3");
- from("direct:start")
- .setHeader("MyHeader").body()
- .toF("undertow:http://localhost:%d", port);
- }
- }
- );
-
- MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class);
- m1.expectedMessageCount(1);
- m1.expectedBodiesReceived("r1");
-
- MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class);
- m2.expectedMessageCount(1);
- m2.expectedBodiesReceived("r2");
-
+ RouteBuilder.addRoutes(context, b -> {
+ b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}")
+ .to("mock:r1");
+ b.fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port)
+ .routeId("r2")
+ .setBody().simple("${routeId}")
+ .to("mock:r2");
+ b.fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port)
+ .routeId("r3")
+ .setBody().simple("${routeId}")
+ .to("mock:r3");
+
+ b.from("direct:start")
+ .setHeader("MyHeader").body()
+ .toF("undertow:http://localhost:%d", port);
+ });
+
+ context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1);
+ context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1);
context.start();
- assertThat(
- template.requestBody("direct:start", "h1", String.class)
- ).isEqualTo("r1");
- assertThat(
- template.requestBody("direct:start", "h2", String.class)
- ).isEqualTo("r2");
- assertThat(
- template.requestBody("direct:start", "t1", String.class)
- ).isEqualTo("r3");
- assertThat(
- template.requestBody("direct:start", "t2", String.class)
- ).isEqualTo("r3");
-
- m1.assertIsSatisfied();
- m2.assertIsSatisfied();
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
+ assertThat(template.requestBody("direct:start", "t1", String.class)).isEqualTo("r3");
+ assertThat(template.requestBody("direct:start", "t2", String.class)).isEqualTo("r3");
+
+ MockEndpoint.assertIsSatisfied(context);
}
@Test
void testWithRexFilters() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port)
- .routeId("r1")
- .setBody().simple("${routeId}")
- .convertBodyTo(String.class);
- from("direct:start")
- .setHeader("MyHeader").body()
- .toF("undertow:http://localhost:%d", port);
- }
- }
- );
+ RouteBuilder.addRoutes(context, b -> {
+ b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}");
+
+ b.from("direct:start")
+ .setHeader("MyHeader").body()
+ .toF("undertow:http://localhost:%d", port);
+ });
+
+ context.start();
+
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.request("direct:start", e -> e.getMessage().setBody("t1"))).satisfies(e -> {
+ assertThat(e.isFailed()).isTrue();
+ assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+ });
+ }
+
+ @Test
+ void testRemoveConsumer() throws Exception {
+ RouteBuilder.addRoutes(context, b -> {
+ b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}");
+ b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port)
+ .routeId("r2")
+ .setBody().simple("${routeId}");
+ });
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .setHeader("h").body()
+ .toF("undertow:http://localhost:%d", port);
+ });
context.start();
- assertThat(
- template.requestBody("direct:start", "h1", String.class)
- ).isEqualTo("r1");
- assertThatThrownBy(
- () -> template.requestBody("direct:start", "t1", String.class)
- ).isInstanceOf(CamelExecutionException.class).hasCauseExactlyInstanceOf(HttpOperationFailedException.class);
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
+
+ context.getRouteController().stopRoute("r2");
+
+ assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
+ assertThat(e.isFailed()).isTrue();
+ assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+ });
+ }
+
+ @Test
+ void testAddConsumer() throws Exception {
+ RouteBuilder.addRoutes(context, b -> {
+ b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}");
+ });
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .setHeader("h").body()
+ .toF("undertow:http://localhost:%d", port);
+ });
+
+ context.start();
+
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
+ assertThat(e.isFailed()).isTrue();
+ assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+ });
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port)
+ .routeId("r2")
+ .setBody().simple("${routeId}");
+ });
+
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
}
@Test
void testInvokeEndpoint() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- fromF("undertow:http://0.0.0.0:%d", port)
- .routeId("endpoint")
- .setBody().simple("${routeId}")
- .convertBodyTo(String.class)
- .to("mock:endpoint");
- from("direct:start")
- .toF("knative-http:0.0.0.0:%d", port);
- }
- }
- );
+ RouteBuilder.addRoutes(context, b -> {
+ b.fromF("undertow:http://0.0.0.0:%d", port)
+ .routeId("endpoint")
+ .setBody().simple("${routeId}")
+ .to("mock:endpoint");
+
+ b.from("direct:start")
+ .toF("knative-http:0.0.0.0:%d", port);
+ });
MockEndpoint mock = context.getEndpoint("mock:endpoint", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedBodiesReceived("endpoint");
mock.expectedHeaderReceived("Host", "0.0.0.0");
+ mock.expectedMessageCount(1);
context.start();
- template.requestBody("direct:start", "1", String.class);
+ template.sendBody("direct:start", "1");
mock.assertIsSatisfied();
}
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
index 1376c92..cb17060 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
@@ -96,7 +96,6 @@ public class CloudEventsV01Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.custom-event");
mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
@@ -104,6 +103,7 @@ public class CloudEventsV01Test {
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -166,7 +166,6 @@ public class CloudEventsV01Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
@@ -174,9 +173,9 @@ public class CloudEventsV01Test {
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessageCount(1);
mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock2.expectedHeaderReceived("CE-EventType", "my.type");
mock2.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
@@ -184,6 +183,7 @@ public class CloudEventsV01Test {
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
mock2.expectedBodiesReceived("test2");
+ mock2.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -238,7 +238,6 @@ public class CloudEventsV01Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -246,6 +245,7 @@ public class CloudEventsV01Test {
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -301,7 +301,6 @@ public class CloudEventsV01Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -309,6 +308,7 @@ public class CloudEventsV01Test {
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -387,22 +387,22 @@ public class CloudEventsV01Test {
context.start();
MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
- mock1.expectedMessageCount(1);
mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock1.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
mock1.expectedHeaderReceived("CE-Source", "CE1");
mock1.expectedBodiesReceived("test");
+ mock1.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessageCount(1);
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
mock2.expectedHeaderReceived("CE-Source", "CE2");
mock2.expectedBodiesReceived("test");
+ mock2.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
index 75bfc0f..f0d9607 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
@@ -96,7 +96,6 @@ public class CloudEventsV02Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock.expectedHeaderReceived("ce-type", "org.apache.camel.custom-event");
mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
@@ -104,6 +103,7 @@ public class CloudEventsV02Test {
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -166,7 +166,6 @@ public class CloudEventsV02Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
@@ -174,9 +173,9 @@ public class CloudEventsV02Test {
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessageCount(1);
mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock2.expectedHeaderReceived("ce-type", "my.type");
mock2.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
@@ -184,6 +183,7 @@ public class CloudEventsV02Test {
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
mock2.expectedBodiesReceived("test2");
+ mock2.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -238,7 +238,6 @@ public class CloudEventsV02Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
mock.expectedHeaderReceived("ce-id", "myEventID");
@@ -246,6 +245,7 @@ public class CloudEventsV02Test {
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -301,7 +301,6 @@ public class CloudEventsV02Test {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
mock.expectedHeaderReceived("ce-id", "myEventID");
@@ -309,6 +308,7 @@ public class CloudEventsV02Test {
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -387,22 +387,22 @@ public class CloudEventsV02Test {
context.start();
MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
- mock1.expectedMessageCount(1);
mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock1.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock1.expectedHeaderReceived("ce-type", "org.apache.camel.event");
mock1.expectedHeaderReceived("ce-id", "myEventID1");
mock1.expectedHeaderReceived("ce-source", "CE1");
mock1.expectedBodiesReceived("test");
+ mock1.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessageCount(1);
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock2.expectedHeaderReceived("ce-type", "org.apache.camel.event");
mock2.expectedHeaderReceived("ce-id", "myEventID2");
mock2.expectedHeaderReceived("ce-source", "CE2");
mock2.expectedBodiesReceived("test");
+ mock2.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
index da81f6a..996d43e 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
@@ -31,7 +31,6 @@ import org.apache.camel.component.knative.http.KnativeHttpEndpoint;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.support.DefaultHeaderFilterStrategy;
import org.apache.camel.test.AvailablePortFinder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -348,7 +347,6 @@ public class KnativeComponentTest {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
@@ -356,13 +354,9 @@ public class KnativeComponentTest {
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setBody("test");
- }
- );
+ context.createProducerTemplate().sendBody("direct:source", "test");
mock.assertIsSatisfied();
}
@@ -403,7 +397,6 @@ public class KnativeComponentTest {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -411,6 +404,7 @@ public class KnativeComponentTest {
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -467,7 +461,6 @@ public class KnativeComponentTest {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -475,6 +468,7 @@ public class KnativeComponentTest {
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -553,22 +547,22 @@ public class KnativeComponentTest {
context.start();
MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
- mock1.expectedMessageCount(1);
mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock1.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
mock1.expectedHeaderReceived("CE-Source", "CE1");
mock1.expectedBodiesReceived("test");
+ mock1.expectedMessageCount(1);
MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessageCount(1);
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock2.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
mock2.expectedHeaderReceived("CE-Source", "CE2");
mock2.expectedBodiesReceived("test");
+ mock2.expectedMessageCount(1);
context.createProducerTemplate().send(
"direct:source",
@@ -596,112 +590,4 @@ public class KnativeComponentTest {
mock1.assertIsSatisfied();
mock2.assertIsSatisfied();
}
-
- @Test
- void testDefaultHeadersFilter() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:source")
- .setHeader("CamelHeader")
- .constant("CamelHeaderValue")
- .setHeader("MyHeader")
- .constant("MyHeaderValue")
- .to("knative:endpoint/myEndpoint")
- .to("mock:source");
-
- fromF("undertow:http://localhost:%d", port)
- .setBody().constant("test");
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:source", MockEndpoint.class);
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().sendBody("direct:source", "test");
-
- mock.assertIsSatisfied();
-
-
- assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).doesNotContainKey("CamelHeader");
- assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).containsEntry("MyHeader", "MyHeaderValue");
- }
-
- @Test
- void testCustomHeadersFilter() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- DefaultHeaderFilterStrategy hfs = new DefaultHeaderFilterStrategy();
- hfs.setOutFilterPattern("(?i)(My)[\\.|a-z|A-z|0-9]*");
- hfs.setInFilterPattern("(?i)(My)[\\.|a-z|A-z|0-9]*");
-
-
- context.getRegistry().bind("myFilterStrategy", hfs);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:source")
- .setHeader("CamelHeader")
- .constant("CamelHeaderValue")
- .setHeader("MyHeader")
- .constant("MyHeaderValue")
- .to("knative:endpoint/myEndpoint?transport.headerFilterStrategy=#myFilterStrategy")
- .to("mock:source");
-
- fromF("undertow:http://localhost:%d?headerFilterStrategy=#myFilterStrategy", port)
- .setBody().constant("test");
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:source", MockEndpoint.class);
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().sendBody("direct:source", "test");
-
- mock.assertIsSatisfied();
-
- assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).doesNotContainKey("MyHeader");
- assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).containsEntry("CamelHeader", "CamelHeaderValue");
- }
}
diff --git a/pom.xml b/pom.xml
index e9c0068..9458562 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
<immutables.version>2.7.5</immutables.version>
<semver4j.version>3.0.0</semver4j.version>
<undertow.version>1.4.26.Final</undertow.version>
+ <vertx.version>3.8.0</vertx.version>
<graalvm.version>19.1.1</graalvm.version>
<gmavenplus-plugin.version>1.7.1</gmavenplus-plugin.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>