You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/08/02 12:32:24 UTC
[camel-k-runtime] branch master updated: knative: true knative-http
component
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new 34db63b knative: true knative-http component
34db63b is described below
commit 34db63bfae88dafc74ae437448c9ea21a09b4588
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Wed Jul 31 17:47:20 2019 +0200
knative: true knative-http component
---
.../maven/processors/CatalogProcessor3x.java | 6 +-
.../maven/processors/CatalogProcessor3Test.java | 13 +-
camel-knative-http/pom.xml | 55 ++-
.../camel/component/knative/http/KnativeHttp.java | 130 +++++++
.../component/knative/http/KnativeHttpBinding.java | 318 ++++++++++++++++
.../knative/http/KnativeHttpClientCallback.java | 242 ++++++++++++
.../knative/http/KnativeHttpComponent.java | 410 ++++-----------------
.../knative/http/KnativeHttpConsumer.java | 132 +++++++
.../knative/http/KnativeHttpDispatcher.java | 181 +++++++++
.../knative/http/KnativeHttpEndpoint.java | 164 +++++++++
.../http/KnativeHttpHeaderFilterStrategy.java | 49 +++
.../knative/http/KnativeHttpProducer.java | 131 +++++++
.../org/apache/camel/component/knative-http | 18 -
.../component/knative/http/KnativeHttpMain.java | 53 ---
.../component/knative/http/KnativeHttpTest.java | 216 +++++++++++
camel-knative/pom.xml | 12 +-
.../apache/camel/component/knative/Knative.java | 2 +
.../camel/component/knative/KnativeComponent.java | 2 +
.../camel/component/knative/KnativeEndpoint.java | 21 +-
.../services/org/apache/camel/component/knative | 18 -
.../component/knative/CloudEventsV01Test.java | 8 +-
.../component/knative/CloudEventsV02Test.java | 8 +-
.../component/knative/KnativeComponentTest.java | 40 +-
23 files changed, 1738 insertions(+), 491 deletions(-)
diff --git a/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java b/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java
index 962f95c..5fa5398 100644
--- a/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java
+++ b/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java
@@ -192,7 +192,8 @@ public class CatalogProcessor3x implements CatalogProcessor {
artifact.setArtifactId("camel-knative");
artifact.setVersion(project.getVersion());
artifact.createScheme("knative").setHttp(true);
- artifact.addDependency("org.apache.camel", "camel-netty4-http");
+ artifact.addDependency("org.apache.camel", "camel-cloud");
+ artifact.addDependency("org.apache.camel", "camel-http-common");
artifacts.put(artifact.getArtifactId(), artifact);
}
@@ -239,10 +240,11 @@ public class CatalogProcessor3x implements CatalogProcessor {
CamelArtifact artifact = new CamelArtifact();
artifact.setGroupId("org.apache.camel.k");
artifact.setArtifactId("camel-k-runtime-knative");
+ artifact.addDependency("org.apache.camel", "camel-cloud");
+ artifact.addDependency("org.apache.camel", "camel-http-common");
artifact.addDependency("org.apache.camel.k", "camel-k-loader-yaml");
artifact.addDependency("org.apache.camel.k", "camel-knative");
artifact.addDependency("org.apache.camel.k", "camel-knative-http");
- artifact.addDependency("org.apache.camel", "camel-netty4-http");
artifacts.put(artifact.getArtifactId(), artifact);
}
diff --git a/camel-k-maven-plugin/src/test/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3Test.java b/camel-k-maven-plugin/src/test/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3Test.java
index c1457ea..5051c12 100644
--- a/camel-k-maven-plugin/src/test/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3Test.java
+++ b/camel-k-maven-plugin/src/test/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3Test.java
@@ -102,19 +102,28 @@ public class CatalogProcessor3Test extends AbstractCataloProcessorTest {
d -> d.getGroupId().equals("org.apache.camel.k") && d.getArtifactId().equals("camel-knative")
);
assertThat(a.getDependencies()).anyMatch(
+ d -> d.getGroupId().equals("org.apache.camel.k") && d.getArtifactId().equals("camel-knative")
+ );
+ assertThat(a.getDependencies()).anyMatch(
d -> d.getGroupId().equals("org.apache.camel.k") && d.getArtifactId().equals("camel-knative-http")
);
assertThat(a.getDependencies()).anyMatch(
d -> d.getGroupId().equals("org.apache.camel.k") && d.getArtifactId().equals("camel-k-loader-yaml")
);
assertThat(a.getDependencies()).anyMatch(
- d -> d.getGroupId().equals("org.apache.camel") && d.getArtifactId().equals("camel-netty4-http")
+ d -> d.getGroupId().equals("org.apache.camel") && d.getArtifactId().equals("camel-cloud")
+ );
+ assertThat(a.getDependencies()).anyMatch(
+ d -> d.getGroupId().equals("org.apache.camel") && d.getArtifactId().equals("camel-http-common")
);
});
assertThat(artifactMap.get("camel-knative")).satisfies(a -> {
assertThat(a.getDependencies()).anyMatch(
- d -> d.getGroupId().equals("org.apache.camel") && d.getArtifactId().equals("camel-netty4-http")
+ d -> d.getGroupId().equals("org.apache.camel") && d.getArtifactId().equals("camel-cloud")
+ );
+ assertThat(a.getDependencies()).anyMatch(
+ d -> d.getGroupId().equals("org.apache.camel") && d.getArtifactId().equals("camel-http-common")
);
});
diff --git a/camel-knative-http/pom.xml b/camel-knative-http/pom.xml
index 39ecf0a..08c7a42 100644
--- a/camel-knative-http/pom.xml
+++ b/camel-knative-http/pom.xml
@@ -48,15 +48,26 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-netty4-http</artifactId>
+ <artifactId>camel-http-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-core</artifactId>
+ <version>${undertow.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>spi-annotations</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>apt</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -87,6 +98,26 @@
<artifactId>camel-properties</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-direct</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-log</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-undertow</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
@@ -122,4 +153,26 @@
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-package-maven-plugin</artifactId>
+ <version>${camel.version}</version>
+ <configuration>
+ <failFast>false</failFast>
+ </configuration>
+ <executions>
+ <execution>
+ <id>generate</id>
+ <goals>
+ <goal>prepare-components</goal>
+ </goals>
+ <phase>process-classes</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
index 0f4f460..ac5c713 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
@@ -16,7 +16,137 @@
*/
package org.apache.camel.component.knative.http;
+import java.util.Objects;
+import javax.net.ssl.SSLContext;
+
public final class KnativeHttp {
private KnativeHttp() {
}
+
+ public static final class HostKey {
+ private final String host;
+ private final int port;
+ private final SSLContext sslContext;
+
+ public HostKey(String host, int port, SSLContext ssl) {
+ this.host = host;
+ this.port = port;
+ this.sslContext = ssl;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HostKey key = (HostKey) o;
+ return getPort() == key.getPort()
+ && getHost().equals(key.getHost());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getHost(), getPort());
+ }
+ }
+
+
+ /**
+ * Options to configure an Undertow host.
+ */
+ public static final class HostOptions {
+
+ /**
+ * The number of worker threads to use in a Undertow host.
+ */
+ private Integer workerThreads;
+
+ /**
+ * The number of io threads to use in a Undertow host.
+ */
+ private Integer ioThreads;
+
+ /**
+ * The buffer size of the Undertow host.
+ */
+ private Integer bufferSize;
+
+ /**
+ * Set if the Undertow host should use direct buffers.
+ */
+ private Boolean directBuffers;
+
+ /**
+ * Set if the Undertow host should use http2 protocol.
+ */
+ private Boolean http2Enabled;
+
+
+ public Integer getWorkerThreads() {
+ return workerThreads;
+ }
+
+ public void setWorkerThreads(Integer workerThreads) {
+ this.workerThreads = workerThreads;
+ }
+
+ public Integer getIoThreads() {
+ return ioThreads;
+ }
+
+ public void setIoThreads(Integer ioThreads) {
+ this.ioThreads = ioThreads;
+ }
+
+ public Integer getBufferSize() {
+ return bufferSize;
+ }
+
+ public void setBufferSize(Integer bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public Boolean getDirectBuffers() {
+ return directBuffers;
+ }
+
+ public void setDirectBuffers(Boolean directBuffers) {
+ this.directBuffers = directBuffers;
+ }
+
+ public Boolean getHttp2Enabled() {
+ return http2Enabled;
+ }
+
+ public void setHttp2Enabled(Boolean http2Enabled) {
+ this.http2Enabled = http2Enabled;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("UndertowHostOptions{");
+ sb.append("workerThreads=").append(workerThreads);
+ sb.append(", ioThreads=").append(ioThreads);
+ sb.append(", bufferSize=").append(bufferSize);
+ sb.append(", directBuffers=").append(directBuffers);
+ sb.append(", http2Enabled=").append(http2Enabled);
+ sb.append('}');
+ return sb.toString();
+ }
+ }
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java
new file mode 100644
index 0000000..8dd6f34
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+
+import io.undertow.client.ClientExchange;
+import io.undertow.client.ClientRequest;
+import io.undertow.client.ClientResponse;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.Headers;
+import io.undertow.util.HttpString;
+import io.undertow.util.Methods;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ObjectHelper;
+import org.apache.camel.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xnio.channels.BlockingReadableByteChannel;
+import org.xnio.channels.StreamSourceChannel;
+
+public final class KnativeHttpBinding {
+ private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpBinding.class);
+
+ private final HeaderFilterStrategy headerFilterStrategy;
+ private final Boolean transferException;
+
+ public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
+ this(headerFilterStrategy, Boolean.FALSE);
+ }
+
+ public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy, Boolean transferException) {
+ this.headerFilterStrategy = Objects.requireNonNull(headerFilterStrategy, "headerFilterStrategy");
+ this.transferException = transferException;
+ }
+
+ public Message toCamelMessage(HttpServerExchange httpExchange, Exchange exchange) throws Exception {
+ Message result = new DefaultMessage(exchange.getContext());
+
+ populateCamelHeaders(httpExchange, result.getHeaders(), exchange);
+
+ //extract body by myself if undertow parser didn't handle and the method is allowed to have one
+ //body is extracted as byte[] then auto TypeConverter kicks in
+ if (Methods.POST.equals(httpExchange.getRequestMethod()) || Methods.PUT.equals(httpExchange.getRequestMethod()) || Methods.PATCH.equals(httpExchange.getRequestMethod())) {
+ result.setBody(readFromChannel(httpExchange.getRequestChannel()));
+ } else {
+ result.setBody(null);
+ }
+
+ return result;
+ }
+
+ public Message toCamelMessage(ClientExchange clientExchange, Exchange exchange) throws Exception {
+ Message result = new DefaultMessage(exchange.getContext());
+
+ //retrieve response headers
+ populateCamelHeaders(clientExchange.getResponse(), result.getHeaders(), exchange);
+
+ result.setBody(readFromChannel(clientExchange.getResponseChannel()));
+
+ return result;
+ }
+
+ public void populateCamelHeaders(HttpServerExchange httpExchange, Map<String, Object> headersMap, Exchange exchange) {
+ String path = httpExchange.getRequestPath();
+ KnativeHttpEndpoint endpoint = (KnativeHttpEndpoint) exchange.getFromEndpoint();
+ if (endpoint.getHttpURI() != null) {
+ // need to match by lower case as we want to ignore case on context-path
+ String endpointPath = endpoint.getHttpURI().getPath();
+ String matchPath = path.toLowerCase(Locale.US);
+ String match = endpointPath.toLowerCase(Locale.US);
+ if (matchPath.startsWith(match)) {
+ path = path.substring(endpointPath.length());
+ }
+ }
+ headersMap.put(Exchange.HTTP_PATH, path);
+
+ for (HttpString name : httpExchange.getRequestHeaders().getHeaderNames()) {
+ if (name.toString().toLowerCase(Locale.US).equals("content-type")) {
+ name = Headers.CONTENT_TYPE;
+ }
+
+ if (name.toString().toLowerCase(Locale.US).equals("authorization")) {
+ String value = httpExchange.getRequestHeaders().get(name).toString();
+ // store a special header that this request was authenticated using HTTP Basic
+ if (value != null && value.trim().startsWith("Basic")) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) {
+ appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic");
+ }
+ }
+ }
+
+ // add the headers one by one, and use the header filter strategy
+ for (Object value : httpExchange.getRequestHeaders().get(name)) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) {
+ appendHeader(headersMap, name.toString(), value);
+ }
+ }
+ }
+
+ //process uri parameters as headers
+ Map<String, Deque<String>> pathParameters = httpExchange.getQueryParameters();
+ for (Map.Entry<String, Deque<String>> entry : pathParameters.entrySet()) {
+ String name = entry.getKey();
+ for (Object value: entry.getValue()) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
+ appendHeader(headersMap, name, value);
+ }
+ }
+ }
+
+ headersMap.put(Exchange.HTTP_METHOD, httpExchange.getRequestMethod().toString());
+ headersMap.put(Exchange.HTTP_URL, httpExchange.getRequestURL());
+ headersMap.put(Exchange.HTTP_URI, httpExchange.getRequestURI());
+ headersMap.put(Exchange.HTTP_QUERY, httpExchange.getQueryString());
+ headersMap.put(Exchange.HTTP_RAW_QUERY, httpExchange.getQueryString());
+ }
+
+ public void populateCamelHeaders(ClientResponse response, Map<String, Object> headersMap, Exchange exchange) throws Exception {
+ headersMap.put(Exchange.HTTP_RESPONSE_CODE, response.getResponseCode());
+
+ for (HttpString name : response.getResponseHeaders().getHeaderNames()) {
+ // mapping the content-type
+ //String name = httpName.toString();
+ if (name.toString().toLowerCase(Locale.US).equals("content-type")) {
+ name = Headers.CONTENT_TYPE;
+ }
+
+ if (name.toString().toLowerCase(Locale.US).equals("authorization")) {
+ String value = response.getResponseHeaders().get(name).toString();
+ // store a special header that this request was authenticated using HTTP Basic
+ if (value != null && value.trim().startsWith("Basic")) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) {
+ appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic");
+ }
+ }
+ }
+
+ // add the headers one by one, and use the header filter strategy
+ for (Object value : response.getResponseHeaders().get(name)) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) {
+ appendHeader(headersMap, name.toString(), value);
+ }
+ }
+ }
+ }
+
+ public Object toHttpResponse(HttpServerExchange httpExchange, Message message) throws IOException {
+ final boolean failed = message.getExchange().isFailed();
+ final int defaultCode = failed ? 500 : 200;
+ final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);
+ final TypeConverter tc = message.getExchange().getContext().getTypeConverter();
+
+ httpExchange.setStatusCode(code);
+
+ //copy headers from Message to Response
+ for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ // use an iterator as there can be multiple values. (must not use a delimiter)
+ for (Object it: ObjectHelper.createIterable(value, null)) {
+ String headerValue = tc.convertTo(String.class, it);
+ if (headerValue == null) {
+ continue;
+ }
+ if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
+ httpExchange.getResponseHeaders().add(new HttpString(key), headerValue);
+ }
+ }
+ }
+
+ Object body = message.getBody();
+ Exception exception = message.getExchange().getException();
+
+ if (exception != null) {
+ if (transferException) {
+ // we failed due an exception, and transfer it as java serialized object
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(exception);
+ oos.flush();
+ IOHelper.close(oos, bos);
+
+ // the body should be the serialized java object of the exception
+ body = ByteBuffer.wrap(bos.toByteArray());
+ // force content type to be serialized java object
+ message.setHeader(Exchange.CONTENT_TYPE, "application/x-java-serialized-object");
+ } else {
+ // we failed due an exception so print it as plain text
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ exception.printStackTrace(pw);
+
+ // the body should then be the stacktrace
+ body = ByteBuffer.wrap(sw.toString().getBytes());
+ // force content type to be text/plain as that is what the stacktrace is
+ message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
+ }
+
+ // and mark the exception as failure handled, as we handled it by returning it as the response
+ ExchangeHelper.setFailureHandled(message.getExchange());
+ }
+
+ // set the content type in the response.
+ String contentType = MessageHelper.getContentType(message);
+ if (contentType != null) {
+ // set content-type
+ httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType);
+ LOG.trace("Content-Type: {}", contentType);
+ }
+ return body;
+ }
+
+ public Object toHttpRequest(ClientRequest clientRequest, Message message) {
+ final Object body = message.getBody();
+ final HeaderMap requestHeaders = clientRequest.getRequestHeaders();
+
+ // set the content type in the response.
+ String contentType = MessageHelper.getContentType(message);
+ if (contentType != null) {
+ // set content-type
+ requestHeaders.put(Headers.CONTENT_TYPE, contentType);
+ }
+
+ TypeConverter tc = message.getExchange().getContext().getTypeConverter();
+
+ //copy headers from Message to Request
+ for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ // use an iterator as there can be multiple values. (must not use a delimiter)
+ for (Object it: ObjectHelper.createIterable(value, null)) {
+ String headerValue = tc.convertTo(String.class, it);
+ if (headerValue == null) {
+ continue;
+ }
+ if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
+ requestHeaders.add(new HttpString(key), headerValue);
+ }
+ }
+ }
+
+ return body;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void appendHeader(Map<String, Object> headers, String key, Object value) {
+ if (headers.containsKey(key)) {
+ Object existing = headers.get(key);
+ List<Object> list;
+ if (existing instanceof List) {
+ list = (List<Object>) existing;
+ } else {
+ list = new ArrayList<>();
+ list.add(existing);
+ }
+ list.add(value);
+ value = list;
+ }
+
+ headers.put(key, value);
+ }
+
+ public static byte[] readFromChannel(StreamSourceChannel source) throws IOException {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
+ final ReadableByteChannel blockingSource = new BlockingReadableByteChannel(source);
+
+ for (;;) {
+ int res = blockingSource.read(buffer);
+ if (res == -1) {
+ return out.toByteArray();
+ } else if (res == 0) {
+ LOG.error("Channel did not block");
+ } else {
+ buffer.flip();
+ out.write(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.arrayOffset() + buffer.limit());
+ buffer.clear();
+ }
+ }
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java
new file mode 100644
index 0000000..4adf801
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import io.undertow.client.ClientCallback;
+import io.undertow.client.ClientConnection;
+import io.undertow.client.ClientExchange;
+import io.undertow.client.ClientRequest;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.http.common.HttpHelper;
+import org.apache.camel.http.common.HttpOperationFailedException;
+import org.apache.camel.support.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xnio.ChannelExceptionHandler;
+import org.xnio.ChannelListener;
+import org.xnio.ChannelListeners;
+import org.xnio.IoUtils;
+import org.xnio.channels.StreamSinkChannel;
+
+
+class KnativeHttpClientCallback implements ClientCallback<ClientConnection> {
+ private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpClientCallback.class);
+
+ private final ByteBuffer body;
+ private final AsyncCallback callback;
+ private final BlockingDeque<Closeable> closeables;
+ private final KnativeHttpEndpoint endpoint;
+ private final Exchange exchange;
+ private final ClientRequest request;
+
+ KnativeHttpClientCallback(Exchange exchange, AsyncCallback callback, KnativeHttpEndpoint endpoint, ClientRequest request, ByteBuffer body) {
+ this.closeables = new LinkedBlockingDeque<>();
+ this.exchange = exchange;
+ this.callback = callback;
+ this.endpoint = endpoint;
+ this.request = request;
+ this.body = body;
+ }
+
+ @Override
+ public void completed(final ClientConnection connection) {
+ // we have established connection, make sure we close it
+ deferClose(connection);
+
+ // now we can send the request and perform the exchange: writing the
+ // request and reading the response
+ connection.sendRequest(request, on(this::performClientExchange));
+ }
+
+ @Override
+ public void failed(final IOException e) {
+ hasFailedWith(e);
+ }
+
+ private ChannelListener<StreamSinkChannel> asyncWriter(final ByteBuffer body) {
+ return channel -> {
+ try {
+ write(channel, body);
+
+ if (body.hasRemaining()) {
+ channel.resumeWrites();
+ } else {
+ flush(channel);
+ }
+ } catch (final IOException e) {
+ hasFailedWith(e);
+ }
+ };
+ }
+
+ private void deferClose(final Closeable closeable) {
+ try {
+ closeables.putFirst(closeable);
+ } catch (final InterruptedException e) {
+ hasFailedWith(e);
+ }
+ }
+
+ private void finish(final Message result) {
+ for (final Closeable closeable : closeables) {
+ IoUtils.safeClose(closeable);
+ }
+
+ if (result != null) {
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ exchange.setOut(result);
+ } else {
+ exchange.setIn(result);
+ }
+ }
+
+ callback.done(false);
+ }
+
+ private void hasFailedWith(final Throwable e) {
+ LOG.trace("Exchange has failed with", e);
+ if (Boolean.TRUE.equals(endpoint.getThrowExceptionOnFailure())) {
+ exchange.setException(e);
+ }
+
+ finish(null);
+ }
+
+ private <T> ClientCallback<T> on(final Consumer<T> completed) {
+ return on(completed, this::hasFailedWith);
+ }
+
+ private <T> ClientCallback<T> on(Consumer<T> completed, Consumer<IOException> failed) {
+ return new ClientCallback<T>() {
+ @Override
+ public void completed(final T result) {
+ completed.accept(result);
+ }
+
+ @Override
+ public void failed(final IOException e) {
+ failed(e);
+ }
+ };
+ }
+
+ private void performClientExchange(final ClientExchange clientExchange) {
+ // add response listener to the exchange, we could receive the response
+ // at any time (async)
+ setupResponseListener(clientExchange);
+
+ // write the request
+ writeRequest(clientExchange, body);
+ }
+
+ private void setupResponseListener(final ClientExchange clientExchange) {
+ clientExchange.setResponseListener(on(response -> {
+ try {
+ final KnativeHttpBinding binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
+ final Message result = binding.toCamelMessage(clientExchange, exchange);
+ final int code = clientExchange.getResponse().getResponseCode();
+
+ if (!HttpHelper.isStatusCodeOk(code, "200-299") && endpoint.getThrowExceptionOnFailure()) {
+ // operation failed so populate exception to throw
+ final String uri = endpoint.getHttpURI().toString();
+ final String statusText = clientExchange.getResponse().getStatus();
+
+ // Convert Message headers (Map<String, Object>) to Map<String, String> as expected by
+ // HttpOperationsFailedException using Message versus clientExchange as its header values
+ // have extra formatting
+ final Map<String, String> headers = result.getHeaders().entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().toString()));
+
+ // Since result (Message) isn't associated with an Exchange yet, you can not use result.getBody(String.class)
+ final String bodyText = ExchangeHelper.convertToType(exchange, String.class, result.getBody());
+ final Exception cause = new HttpOperationFailedException(uri, code, statusText, null, headers, bodyText);
+
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ exchange.setOut(result);
+ } else {
+ exchange.setIn(result);
+ }
+
+ // make sure to fail with HttpOperationFailedException
+ hasFailedWith(cause);
+ } else {
+ // we end Camel exchange here
+ finish(result);
+ }
+ } catch (Throwable e) {
+ hasFailedWith(e);
+ }
+ }));
+ }
+
+ private void writeRequest(final ClientExchange clientExchange, final ByteBuffer body) {
+ final StreamSinkChannel requestChannel = clientExchange.getRequestChannel();
+ if (body != null) {
+ try {
+ // try writing, we could be on IO thread and ready to write to
+ // the socket (or not)
+ write(requestChannel, body);
+
+ if (body.hasRemaining()) {
+ // we did not write all of body (or at all) register a write
+ // listener to write asynchronously
+ requestChannel.getWriteSetter().set(asyncWriter(body));
+ requestChannel.resumeWrites();
+ } else {
+ // we are done, we need to flush the request
+ flush(requestChannel);
+ }
+ } catch (final IOException e) {
+ hasFailedWith(e);
+ }
+ }
+ }
+
+ private static void flush(final StreamSinkChannel channel) throws IOException {
+ // the canonical way of flushing Xnio channels
+ channel.shutdownWrites();
+ if (!channel.flush()) {
+ final ChannelListener<StreamSinkChannel> safeClose = IoUtils::safeClose;
+ final ChannelExceptionHandler<Channel> closingChannelExceptionHandler = ChannelListeners
+ .closingChannelExceptionHandler();
+ final ChannelListener<StreamSinkChannel> flushingChannelListener = ChannelListeners
+ .flushingChannelListener(safeClose, closingChannelExceptionHandler);
+ channel.getWriteSetter().set(flushingChannelListener);
+ channel.resumeWrites();
+ }
+ }
+
+ private static void write(final StreamSinkChannel channel, final ByteBuffer body) throws IOException {
+ int written = 1;
+ while (body.hasRemaining() && written > 0) {
+ written = channel.write(body);
+ }
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
index b2833b4..bea70a5 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
@@ -16,381 +16,95 @@
*/
package org.apache.camel.component.knative.http;
-import java.net.URI;
-import java.nio.channels.ClosedChannelException;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.HttpContent;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaderValues;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
+import io.undertow.predicate.Predicate;
+import io.undertow.server.HttpHandler;
import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
-import org.apache.camel.component.netty4.NettyConverter;
-import org.apache.camel.component.netty4.http.DefaultNettyHttpBinding;
-import org.apache.camel.component.netty4.http.HttpServerConsumerChannelFactory;
-import org.apache.camel.component.netty4.http.NettyHttpBinding;
-import org.apache.camel.component.netty4.http.NettyHttpComponent;
-import org.apache.camel.component.netty4.http.NettyHttpConfiguration;
-import org.apache.camel.component.netty4.http.NettyHttpConsumer;
-import org.apache.camel.component.netty4.http.NettyHttpHelper;
-import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
-import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.support.RestConsumerContextPathMatcher;
-import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.URISupport;
-import org.apache.camel.util.UnsafeUriCharactersEncoder;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class KnativeHttpComponent extends NettyHttpComponent {
+@Component("knative-http")
+public class KnativeHttpComponent extends DefaultComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class);
- private final Map<Integer, HttpServerConsumerChannelFactory> handlers = new ConcurrentHashMap<>();
-
- public KnativeHttpComponent() {
- super();
- setNettyHttpBinding(new KnativeNettyHttpBinding(getHeaderFilterStrategy()));
- }
+ private final Map<KnativeHttp.HostKey, KnativeHttpDispatcher> registry;
- @Override
- public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) {
- return handlers.computeIfAbsent(port, Handler::new);
- }
-
- @Override
- protected void doStop() throws Exception {
- super.doStop();
+ @Metadata(label = "advanced")
+ private KnativeHttp.HostOptions hostOptions;
- ServiceHelper.startService(handlers.values());
- handlers.clear();
+ public KnativeHttpComponent() {
+ this.registry = new ConcurrentHashMap<>();
}
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- return super.createEndpoint(uri, remaining, parameters);
- }
-
- @ChannelHandler.Sharable
- private static class Handler extends SimpleChannelInboundHandler<Object> implements HttpServerConsumerChannelFactory {
- private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
- private static final AttributeKey<HttpServerChannelHandler> SERVER_HANDLER_KEY = AttributeKey.valueOf("serverHandler");
-
- private final Set<HttpServerChannelHandler> consumers;
- private final int port;
- private final String token;
- private final int len;
-
- public Handler(int port) {
- this.consumers = new CopyOnWriteArraySet<>();
- this.port = port;
- this.token = ":" + port;
- this.len = token.length();
- }
-
- public void init(int port) {
- }
-
- public void addConsumer(NettyHttpConsumer consumer) {
- consumers.add(new HttpServerChannelHandler(consumer));
- }
-
- public void removeConsumer(NettyHttpConsumer consumer) {
- consumers.removeIf(h -> h.getConsumer() == consumer);
- }
-
- public int consumers() {
- return consumers.size();
- }
-
- public int getPort() {
- return port;
- }
-
- public ChannelHandler getChannelHandler() {
- return this;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- // store request, as this channel handler is created per pipeline
- HttpRequest request = (HttpRequest) msg;
-
- LOG.debug("Message received: {}", request);
-
- HttpServerChannelHandler handler = getHandler(request, request.method().name());
- if (handler != null) {
- Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY);
- // store handler as attachment
- attr.set(handler);
- if (msg instanceof HttpContent) {
- // need to hold the reference of content
- HttpContent httpContent = (HttpContent) msg;
- httpContent.content().retain();
- }
- handler.channelRead(ctx, request);
- } else {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
- response.headers().set(Exchange.CONTENT_TYPE, "text/plain");
- response.headers().set(Exchange.CONTENT_LENGTH, 0);
- ctx.writeAndFlush(response);
- ctx.close();
- }
- }
+ final Pattern pattern = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)");
+ final Matcher matcher = pattern.matcher(remaining);
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY);
- HttpServerChannelHandler handler = attr.get();
- if (handler != null) {
- handler.exceptionCaught(ctx, cause);
- } else {
- if (cause instanceof ClosedChannelException) {
- // The channel is closed so we do nothing here
- LOG.debug("Channel already closed. Ignoring this exception.");
- return;
- } else {
- // we cannot throw the exception here
- LOG.warn("HttpServerChannelHandler is not found as attachment to handle exception, send 404 back to the client.", cause);
- // Now we just send 404 back to the client
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
- response.headers().set(Exchange.CONTENT_TYPE, "text/plain");
- response.headers().set(Exchange.CONTENT_LENGTH, 0);
- ctx.writeAndFlush(response);
- ctx.close();
- }
- }
+ if (!matcher.find()) {
+ throw new IllegalArgumentException("Bad URI: " + remaining);
}
- private boolean isHttpMethodAllowed(HttpRequest request, String method) {
- return getHandler(request, method) != null;
+ final String host;
+ final int port;
+ final String path;
+
+ switch (matcher.groupCount()) {
+ case 1:
+ host = matcher.group(1);
+ port = 8080;
+ path = "/";
+ break;
+ case 2:
+ host = matcher.group(1);
+ port = Integer.parseInt(matcher.group(2));
+ path = "/";
+ break;
+ case 3:
+ host = matcher.group(1);
+ port = Integer.parseInt(matcher.group(2));
+ path = "/" + matcher.group(3);
+ break;
+ default:
+ throw new IllegalArgumentException("Bad URI: " + remaining);
}
- private HttpServerChannelHandler getHandler(HttpRequest request, String method) {
- HttpServerChannelHandler answer = null;
+ KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this);
+ ep.setHost(host);
+ ep.setPort(port);
+ ep.setPath(path);
+ ep.setHeaderFilter(IntrospectionSupport.extractProperties(parameters, "filter.", true));
- // need to strip out host and port etc, as we only need the context-path for matching
- if (method == null) {
- return null;
- }
-
- String path = request.uri();
- int idx = path.indexOf(token);
- if (idx > -1) {
- path = path.substring(idx + len);
- }
- // use the path as key to find the consumer handler to use
- path = pathAsKey(path);
-
- // fallback to regular matching
- if (answer == null) {
- for (final HttpServerChannelHandler handler : consumers) {
- try {
- final NettyHttpConsumer consumer = handler.getConsumer();
- final HttpHeaders headers = request.headers();
- final String uri = consumer.getEndpoint().getEndpointUri();
- final Map<String, Object> params = URISupport.parseParameters(URI.create(uri));
-
- if (params.containsKey("filter.headerName") && params.containsKey("filter.headerValue")) {
- final String filterKey = (String) params.get("filter.headerName");
- final String filterVal = (String) params.get("filter.headerValue");
- final String headerVal = headers.get(filterKey);
-
- if (ObjectHelper.isEmpty(headerVal)) {
- continue;
- }
- if (!ObjectHelper.equal(filterVal, headerVal)) {
- continue;
- }
- }
-
- String consumerPath = consumer.getConfiguration().getPath();
- boolean matchOnUriPrefix = consumer.getEndpoint().getConfiguration().isMatchOnUriPrefix();
- // Just make sure the we get the right consumer path first
- if (RestConsumerContextPathMatcher.matchPath(path, consumerPath, matchOnUriPrefix)) {
- answer = handler;
- break;
- }
- } catch (Exception e) {
- throw RuntimeCamelException.wrapRuntimeCamelException(e);
- }
- }
- }
-
- return answer;
- }
-
- private static String pathAsKey(String path) {
- // cater for default path
- if (path == null || path.equals("/")) {
- path = "";
- }
-
- // strip out query parameters
- int idx = path.indexOf('?');
- if (idx > -1) {
- path = path.substring(0, idx);
- }
-
- // strip of ending /
- if (path.endsWith("/")) {
- path = path.substring(0, path.length() - 1);
- }
-
- return UnsafeUriCharactersEncoder.encodeHttpURI(path);
- }
+ setProperties(ep, parameters);
+ return ep;
}
+ public KnativeHttp.HostOptions getHostOptions() {
+ return hostOptions;
+ }
- /**
- * Default {@link NettyHttpBinding}.
- */
- public class KnativeNettyHttpBinding extends DefaultNettyHttpBinding {
- public KnativeNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
- super(headerFilterStrategy);
- }
-
- @Override
- public HttpRequest toNettyRequest(Message message, String uri, NettyHttpConfiguration configuration) throws Exception {
- LOGGER.trace("toNettyRequest: {}", message);
-
- // the message body may already be a Netty HTTP response
- if (message.getBody() instanceof HttpRequest) {
- return (HttpRequest) message.getBody();
- }
-
- String uriForRequest = uri;
- if (configuration.isUseRelativePath()) {
- uriForRequest = URISupport.pathAndQueryOf(new URI(uriForRequest));
- }
-
- // just assume GET for now, we will later change that to the actual method to use
- HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriForRequest);
-
- Object body = message.getBody();
- if (body != null) {
- // support bodies as native Netty
- ByteBuf buffer;
- if (body instanceof ByteBuf) {
- buffer = (ByteBuf) body;
- } else {
- // try to convert to buffer first
- buffer = message.getBody(ByteBuf.class);
- if (buffer == null) {
- // fallback to byte array as last resort
- byte[] data = message.getMandatoryBody(byte[].class);
- buffer = NettyConverter.toByteBuffer(data);
- }
- }
- if (buffer != null) {
- request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriForRequest, buffer);
- int len = buffer.readableBytes();
- // set content-length
- request.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len);
- LOGGER.trace("Content-Length: {}", len);
- } else {
- // we do not support this kind of body
- throw new NoTypeConversionAvailableException(body, ByteBuf.class);
- }
- }
-
- // update HTTP method accordingly as we know if we have a body or not
- HttpMethod method = NettyHttpHelper.createMethod(message, body != null);
- request.setMethod(method);
-
- TypeConverter tc = message.getExchange().getContext().getTypeConverter();
-
- // if we bridge endpoint then we need to skip matching headers with the HTTP_QUERY to avoid sending
- // duplicated headers to the receiver, so use this skipRequestHeaders as the list of headers to skip
- Map<String, Object> skipRequestHeaders = null;
- if (configuration.isBridgeEndpoint()) {
- String queryString = message.getHeader(Exchange.HTTP_QUERY, String.class);
- if (queryString != null) {
- skipRequestHeaders = URISupport.parseQuery(queryString, false, true);
- }
- // Need to remove the Host key as it should be not used
- message.getHeaders().remove("host");
- }
-
- // append headers
- // must use entrySet to ensure case of keys is preserved
- for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
- String key = entry.getKey();
- Object value = entry.getValue();
-
- // we should not add headers for the parameters in the uri if we bridge the endpoint
- // as then we would duplicate headers on both the endpoint uri, and in HTTP headers as well
- if (skipRequestHeaders != null && skipRequestHeaders.containsKey(key)) {
- continue;
- }
-
- // use an iterator as there can be multiple values. (must not use a delimiter)
- final Iterator<?> it = org.apache.camel.support.ObjectHelper.createIterator(value, null, true);
- while (it.hasNext()) {
- String headerValue = tc.convertTo(String.class, it.next());
-
- if (headerValue != null && getHeaderFilterStrategy() != null
- && !getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
- LOGGER.trace("HTTP-Header: {}={}", key, headerValue);
- request.headers().add(key, headerValue);
- }
- }
- }
+ public void setHostOptions(KnativeHttp.HostOptions hostOptions) {
+ this.hostOptions = hostOptions;
+ }
- // set the content type in the response.
- String contentType = message.getHeader(Exchange.CONTENT_TYPE, String.class);
- if (contentType != null) {
- // set content-type
- request.headers().set(HttpHeaderNames.CONTENT_TYPE.toString(), contentType);
- LOGGER.trace("Content-Type: {}", contentType);
- }
+ public void bind(KnativeHttp.HostKey key, HttpHandler handler, Predicate predicate) {
+ getUndertow(key).bind(handler, predicate);
+ }
- // must include HOST header as required by HTTP 1.1
- // use URI as its faster than URL (no DNS lookup)
- URI u = new URI(uri);
- String hostHeader = u.getHost() + (u.getPort() == 80 ? "" : ":" + u.getPort());
- request.headers().set(HttpHeaderNames.HOST.toString(), hostHeader);
- LOGGER.trace("Host: {}", hostHeader);
+ public void unbind(KnativeHttp.HostKey key, HttpHandler handler) {
+ getUndertow(key).unbind(handler);
- // configure connection to accordingly to keep alive configuration
- // favor using the header from the message
- String connection = message.getHeader(HttpHeaderNames.CONNECTION.toString(), String.class);
- if (connection == null) {
- // fallback and use the keep alive from the configuration
- if (configuration.isKeepAlive()) {
- connection = HttpHeaderValues.KEEP_ALIVE.toString();
- } else {
- connection = HttpHeaderValues.CLOSE.toString();
- }
- }
- request.headers().set(HttpHeaderNames.CONNECTION.toString(), connection);
- LOGGER.trace("Connection: {}", connection);
+ }
- return request;
- }
+ private KnativeHttpDispatcher getUndertow(KnativeHttp.HostKey key) {
+ return registry.computeIfAbsent(key, k -> new KnativeHttpDispatcher(k, hostOptions));
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
new file mode 100644
index 0000000..26f057b
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import io.undertow.predicate.Predicates;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.HttpString;
+import io.undertow.util.MimeMappings;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler {
+ private final KnativeHttpBinding binding;
+
+ public KnativeHttpConsumer(KnativeHttpEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+
+ this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
+ }
+
+ @Override
+ public KnativeHttpEndpoint getEndpoint() {
+ return (KnativeHttpEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ final KnativeHttpEndpoint endpoint = getEndpoint();
+ final KnativeHttpComponent component = endpoint.getComponent();
+ final KnativeHttp.HostKey key = endpoint.getHostKey();
+
+ component.bind(key, this, Predicates.and(
+ Predicates.path(endpoint.getPath()),
+ value -> {
+ if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) {
+ return true;
+ }
+
+ HeaderMap hm = value.getRequestHeaders();
+
+ for (Map.Entry<String, Object> entry: endpoint.getHeaderFilter().entrySet()) {
+ String ref = entry.getValue().toString();
+ String val = hm.getFirst(entry.getKey());
+ boolean matches = Objects.equals(ref, val) || val.matches(ref);
+
+ if (!matches) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ ));
+
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ final KnativeHttpEndpoint endpoint = getEndpoint();
+ final KnativeHttpComponent component = endpoint.getComponent();
+ final KnativeHttp.HostKey key = endpoint.getHostKey();
+
+ component.unbind(key, this);
+
+ super.doStop();
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange httpExchange) throws Exception {
+ //create new Exchange
+ //binding is used to extract header and payload(if available)
+ Exchange camelExchange = createExchange(httpExchange);
+
+ //Unit of Work to process the Exchange
+ createUoW(camelExchange);
+ try {
+ getProcessor().process(camelExchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ } finally {
+ doneUoW(camelExchange);
+ }
+
+ Object body = binding.toHttpResponse(httpExchange, camelExchange.getMessage());
+ TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter();
+
+ if (body == null) {
+ httpExchange.getResponseHeaders().put(new HttpString(Exchange.CONTENT_TYPE), MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt"));
+ httpExchange.getResponseSender().send("No response available");
+ } else {
+ ByteBuffer bodyAsByteBuffer = tc.mandatoryConvertTo(ByteBuffer.class, body);
+ httpExchange.getResponseSender().send(bodyAsByteBuffer);
+ }
+ }
+
+ public Exchange createExchange(HttpServerExchange httpExchange) throws Exception {
+ Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
+ Message in = binding.toCamelMessage(httpExchange, exchange);
+
+ exchange.setProperty(Exchange.CHARSET_NAME, httpExchange.getRequestCharset());
+ in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, httpExchange.getRequestCharset());
+
+ exchange.setIn(in);
+ return exchange;
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java
new file mode 100644
index 0000000..2d774cd
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import io.undertow.Undertow;
+import io.undertow.UndertowOptions;
+import io.undertow.predicate.Predicate;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.server.handlers.PathHandler;
+import io.undertow.util.Headers;
+import io.undertow.util.StatusCodes;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ReferenceCount;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class KnativeHttpDispatcher implements HttpHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpDispatcher.class);
+
+ private final KnativeHttp.HostKey key;
+ private final KnativeHttp.HostOptions options;
+ private final ReferenceCount refCnt;
+ private final Set<PredicatedHandlerWrapper> handlers;
+ private final Undertow undertow;
+ private final PathHandler handler;
+
+ public KnativeHttpDispatcher(KnativeHttp.HostKey key, KnativeHttp.HostOptions option) {
+ this.handlers = new CopyOnWriteArraySet<>();
+ this.key = key;
+ this.options = option;
+ this.handler = new PathHandler(this);
+ this.undertow = createUndertow();
+ this.refCnt = ReferenceCount.on(this::startUndertow, this::stopUndertow);
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange exchange) throws Exception {
+ LOGGER.debug("received exchange on path: {}, headers: {}",
+ exchange.getRelativePath(),
+ exchange.getRequestHeaders()
+ );
+
+ for (PredicatedHandlerWrapper handler: handlers) {
+ if (handler.dispatch(exchange)) {
+ return;
+ }
+ }
+
+ exchange.setStatusCode(StatusCodes.NOT_FOUND);
+ exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
+ exchange.getResponseSender().send("No matching condition found");
+ }
+
+ public void bind(HttpHandler handler, Predicate predicate) {
+ if (handlers.add(new PredicatedHandlerWrapper(handler, predicate))) {
+ refCnt.retain();
+ }
+ }
+
+ public void unbind(HttpHandler handler) {
+ if (handlers.removeIf(phw -> phw.handler == handler)) {
+ refCnt.release();
+ }
+ }
+
+ private void startUndertow() {
+ try {
+ LOGGER.info("Starting Undertow server on {}://{}:{}}",
+ key.getSslContext() != null ? "https" : "http",
+ key.getHost(),
+ key.getPort()
+ );
+
+ undertow.start();
+ } catch (RuntimeException e) {
+ LOGGER.warn("Failed to start Undertow server on {}://{}:{}, reason: {}",
+ key.getSslContext() != null ? "https" : "http",
+ key.getHost(),
+ key.getPort(),
+ e.getMessage()
+ );
+
+ undertow.stop();
+
+ throw e;
+ }
+ }
+
+ private void stopUndertow() {
+ LOGGER.info("Stopping Undertow server on {}://{}:{}",
+ key.getSslContext() != null ? "https" : "http",
+ key.getHost(),
+ key.getPort());
+
+ undertow.stop();
+ }
+
+ private Undertow createUndertow() {
+ Undertow.Builder builder = Undertow.builder();
+ if (key.getSslContext() != null) {
+ builder.addHttpsListener(key.getPort(), key.getHost(), key.getSslContext());
+ } else {
+ builder.addHttpListener(key.getPort(), key.getHost());
+ }
+
+ if (options != null) {
+ ObjectHelper.ifNotEmpty(options.getIoThreads(), builder::setIoThreads);
+ ObjectHelper.ifNotEmpty(options.getWorkerThreads(), builder::setWorkerThreads);
+ ObjectHelper.ifNotEmpty(options.getBufferSize(), builder::setBufferSize);
+ ObjectHelper.ifNotEmpty(options.getDirectBuffers(), builder::setDirectBuffers);
+ ObjectHelper.ifNotEmpty(options.getHttp2Enabled(), e -> builder.setServerOption(UndertowOptions.ENABLE_HTTP2, e));
+ }
+
+ return builder.setHandler(new PathHandler(handler)).build();
+ }
+
+ private static final class PredicatedHandlerWrapper {
+ private final HttpHandler handler;
+ private final Predicate predicate;
+
+ public PredicatedHandlerWrapper(HttpHandler handler, Predicate predicate) {
+ this.handler = ObjectHelper.notNull(handler, "handler");
+ this.predicate = ObjectHelper.notNull(predicate, "predicate");
+ }
+
+ boolean dispatch(HttpServerExchange exchange) throws Exception {
+ if (predicate.resolve(exchange)) {
+ if (exchange.isInIoThread()) {
+ exchange.dispatch(handler);
+ } else {
+ handler.handleRequest(exchange);
+ }
+
+ return true;
+ }
+
+ LOGGER.debug("No handler for path: {}, headers: {}",
+ exchange.getRelativePath(),
+ exchange.getRequestHeaders()
+ );
+
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PredicatedHandlerWrapper holder = (PredicatedHandlerWrapper) o;
+ return handler.equals(holder.handler);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(handler);
+ }
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
new file mode 100644
index 0000000..e02f736
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.jsse.SSLContextParameters;
+import org.apache.camel.util.ObjectHelper;
+import org.xnio.OptionMap;
+
+@UriEndpoint(
+ firstVersion = "3.0.0",
+ scheme = "knative-http",
+ title = "KnativeHttp",
+ syntax = "knative-http:host:port/path",
+ label = "http",
+ lenientProperties = true)
+public class KnativeHttpEndpoint extends DefaultEndpoint {
+ @UriPath
+ @Metadata(required = true)
+ private String host;
+ @UriPath
+ @Metadata(required = true)
+ private int port;
+ @UriPath
+ private String path;
+
+ @UriParam(label = "advanced")
+ private HeaderFilterStrategy headerFilterStrategy;
+ @UriParam(label = "security")
+ private SSLContextParameters sslContextParameters;
+
+ @UriParam(label = "consumer")
+ private Map<String, Object> headerFilter;
+ @UriParam(label = "producer", defaultValue = "true")
+ private Boolean throwExceptionOnFailure = Boolean.TRUE;
+
+ public KnativeHttpEndpoint(String uri, KnativeHttpComponent component) {
+ super(uri, component);
+
+ this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
+ }
+
+ // **********************************
+ //
+ // Properties
+ //
+ // **********************************
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return headerFilterStrategy;
+ }
+
+ public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) {
+ this.headerFilterStrategy = headerFilterStrategy;
+ }
+
+ public Map<String, Object> getHeaderFilter() {
+ return headerFilter;
+ }
+
+ public void setHeaderFilter(Map<String, Object> headerFilter) {
+ this.headerFilter = headerFilter;
+ }
+
+ public SSLContextParameters getSslContextParameters() {
+ return sslContextParameters;
+ }
+
+ public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+ this.sslContextParameters = sslContextParameters;
+ }
+
+ public Boolean getThrowExceptionOnFailure() {
+ return throwExceptionOnFailure;
+ }
+
+ public void setThrowExceptionOnFailure(Boolean throwExceptionOnFailure) {
+ this.throwExceptionOnFailure = throwExceptionOnFailure;
+ }
+
+ public KnativeHttp.HostKey getHostKey() {
+ return new KnativeHttp.HostKey(host, port, null);
+ }
+
+ public URI getHttpURI() {
+ String uri = "http://" + host + ":" + port;
+
+ if (ObjectHelper.isNotEmpty(path)) {
+ uri += path;
+ }
+
+ return URI.create(uri);
+ }
+
+ // **********************************
+ //
+ // Impl
+ //
+ // **********************************
+
+ @Override
+ public KnativeHttpComponent getComponent() {
+ return (KnativeHttpComponent)super.getComponent();
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new KnativeHttpProducer(this, OptionMap.EMPTY);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new KnativeHttpConsumer(this, processor);
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
new file mode 100644
index 0000000..96051c9
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+
+public class KnativeHttpHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+
+ public KnativeHttpHeaderFilterStrategy() {
+ initialize();
+ }
+
+ protected void initialize() {
+ getOutFilter().add("content-length");
+ getOutFilter().add("content-type");
+ getOutFilter().add("host");
+ // Add the filter for the Generic Message header
+ // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.5
+ getOutFilter().add("cache-control");
+ getOutFilter().add("connection");
+ getOutFilter().add("date");
+ getOutFilter().add("pragma");
+ getOutFilter().add("trailer");
+ getOutFilter().add("transfer-encoding");
+ getOutFilter().add("upgrade");
+ getOutFilter().add("via");
+ getOutFilter().add("warning");
+
+ setLowerCase(true);
+
+ // filter headers begin with "Camel" or "org.apache.camel"
+ // must ignore case for Http based transports
+ setOutFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
new file mode 100644
index 0000000..7d79ccd
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+
+import io.undertow.client.ClientRequest;
+import io.undertow.client.UndertowClient;
+import io.undertow.protocols.ssl.UndertowXnioSsl;
+import io.undertow.server.DefaultByteBufferPool;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.Headers;
+import io.undertow.util.Methods;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.jsse.SSLContextParameters;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xnio.OptionMap;
+import org.xnio.Xnio;
+import org.xnio.XnioWorker;
+import org.xnio.ssl.XnioSsl;
+
+public class KnativeHttpProducer extends DefaultAsyncProducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class);
+
+ private final OptionMap options;
+ private final KnativeHttpBinding binding;
+
+ private UndertowClient client;
+ private DefaultByteBufferPool pool;
+ private XnioSsl ssl;
+ private XnioWorker worker;
+
+ public KnativeHttpProducer(KnativeHttpEndpoint endpoint, OptionMap options) {
+ super(endpoint);
+ this.options = options;
+ this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
+ }
+
+ @Override
+ public KnativeHttpEndpoint getEndpoint() {
+ return (KnativeHttpEndpoint)super.getEndpoint();
+ }
+
+ @Override
+ public boolean process(final Exchange camelExchange, final AsyncCallback callback) {
+ final KnativeHttpEndpoint endpoint = getEndpoint();
+ final URI uri = endpoint.getHttpURI();
+ final String pathAndQuery = URISupport.pathAndQueryOf(uri);
+
+ final ClientRequest request = new ClientRequest();
+ request.setMethod(Methods.POST);
+ request.setPath(pathAndQuery);
+ request.getRequestHeaders().put(Headers.HOST, uri.getHost());
+
+ final Object body = binding.toHttpRequest(request, camelExchange.getIn());
+ final TypeConverter tc = endpoint.getCamelContext().getTypeConverter();
+ final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body);
+
+ // As tryConvertTo is used to convert the body, we should do null check
+ // or the call bodyAsByte.remaining() may throw an NPE
+ if (body != null && bodyAsByte != null) {
+ request.getRequestHeaders().put(Headers.CONTENT_LENGTH, bodyAsByte.remaining());
+ }
+
+ // when connect succeeds or fails UndertowClientCallback will
+ // get notified on a I/O thread run by Xnio worker. The writing
+ // of request and reading of response is performed also in the
+ // callback
+ client.connect(
+ new KnativeHttpClientCallback(camelExchange, callback, getEndpoint(), request, bodyAsByte),
+ uri,
+ worker,
+ ssl,
+ pool,
+ options);
+
+ // the call above will proceed on Xnio I/O thread we will
+ // notify the exchange asynchronously when the HTTP exchange
+ // ends with success or failure from UndertowClientCallback
+ return false;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ final Xnio xnio = Xnio.getInstance();
+
+ pool = new DefaultByteBufferPool(true, 17 * 1024);
+ worker = xnio.createWorker(options);
+
+ SSLContextParameters sslContext = getEndpoint().getSslContextParameters();
+ if (sslContext != null) {
+ ssl = new UndertowXnioSsl(xnio, options, sslContext.createSSLContext(getEndpoint().getCamelContext()));
+ }
+
+ client = UndertowClient.getInstance();
+
+ LOGGER.debug("Created worker: {} with options: {}", worker, options);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ if (worker != null && !worker.isShutdown()) {
+ LOGGER.debug("Shutting down worker: {}", worker);
+ worker.shutdown();
+ }
+ }
+}
diff --git a/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/component/knative-http b/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/component/knative-http
deleted file mode 100644
index 439d764..0000000
--- a/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/component/knative-http
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-class=org.apache.camel.component.knative.http.KnativeHttpComponent
\ No newline at end of file
diff --git a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpMain.java b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpMain.java
deleted file mode 100644
index 5333a58..0000000
--- a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpMain.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
-
-public final class KnativeHttpMain {
- private KnativeHttpMain() {
- }
-
- public static void main(String[] args) throws Exception {
- DefaultCamelContext context = new DefaultCamelContext();
-
- try {
- context.disableJMX();
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative-http:http://0.0.0.0:8080?filter.headerName=CE-Source&filter.headerValue=CH1")
- .convertBodyTo(String.class)
- .to("log:ch-11?showAll=true&multiline=true")
- .setBody().constant("Hello from CH1");
- from("knative-http:http://0.0.0.0:8080?filter.headerName=CE-Source&filter.headerValue=CH2")
- .convertBodyTo(String.class)
- .to("log:ch-2?showAll=true&multiline=true")
- .setBody().constant("Hello from CH2");
- }
- });
-
- context.start();
-
- Thread.sleep(Integer.MAX_VALUE);
- } finally {
- context.stop();
- }
-
- }
-}
diff --git a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
new file mode 100644
index 0000000..e0d1ab6
--- /dev/null
+++ b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.http.common.HttpOperationFailedException;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class KnativeHttpTest {
+
+ private CamelContext context;
+ private ProducerTemplate template;
+ private int port;
+
+ // **************************
+ //
+ // Setup
+ //
+ // **************************
+
+ @BeforeEach
+ public void before() {
+ this.context = new DefaultCamelContext();
+ this.template = this.context.createProducerTemplate();
+ this.port = AvailablePortFinder.getNextAvailable();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ if (this.context != null) {
+ this.context.stop();
+ }
+ }
+
+ // **************************
+ //
+ // Tests
+ //
+ // **************************
+
+ @Test
+ void testWithPaths() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("knative-http:0.0.0.0:%d/a/1", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}")
+ .convertBodyTo(String.class)
+ .to("mock:r1");
+ fromF("knative-http:0.0.0.0:%d/a/2", port)
+ .routeId("r2")
+ .setBody().simple("${routeId}")
+ .convertBodyTo(String.class)
+ .to("mock:r2");
+ from("direct:start")
+ .toD("undertow:http://localhost:" + port + "/a/${body}");
+ }
+ }
+ );
+
+ MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class);
+ m1.expectedMessageCount(1);
+ m1.expectedBodiesReceived("r1");
+
+ MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class);
+ m2.expectedMessageCount(1);
+ m2.expectedBodiesReceived("r2");
+
+ context.start();
+
+ assertThat(
+ template.requestBody("direct:start", "1", String.class)
+ ).isEqualTo("r1");
+ assertThat(
+ template.requestBody("direct:start", "2", String.class)
+ ).isEqualTo("r2");
+
+ m1.assertIsSatisfied();
+ m2.assertIsSatisfied();
+ }
+
+ @Test
+ void testWithFilters() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}")
+ .convertBodyTo(String.class)
+ .to("mock:r1");
+ fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port)
+ .routeId("r2")
+ .setBody().simple("${routeId}")
+ .convertBodyTo(String.class)
+ .to("mock:r2");
+ fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port)
+ .routeId("r3")
+ .setBody().simple("${routeId}")
+ .convertBodyTo(String.class)
+ .to("mock:r3");
+ from("direct:start")
+ .setHeader("MyHeader").body()
+ .toF("undertow:http://localhost:%d", port);
+ }
+ }
+ );
+
+ MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class);
+ m1.expectedMessageCount(1);
+ m1.expectedBodiesReceived("r1");
+
+ MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class);
+ m2.expectedMessageCount(1);
+ m2.expectedBodiesReceived("r2");
+
+ context.start();
+
+ assertThat(
+ template.requestBody("direct:start", "h1", String.class)
+ ).isEqualTo("r1");
+ assertThat(
+ template.requestBody("direct:start", "h2", String.class)
+ ).isEqualTo("r2");
+ assertThat(
+ template.requestBody("direct:start", "t1", String.class)
+ ).isEqualTo("r3");
+ assertThat(
+ template.requestBody("direct:start", "t2", String.class)
+ ).isEqualTo("r3");
+
+ m1.assertIsSatisfied();
+ m2.assertIsSatisfied();
+ }
+
+ @Test
+ void testWithRexFilters() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port)
+ .routeId("r1")
+ .setBody().simple("${routeId}")
+ .convertBodyTo(String.class);
+ from("direct:start")
+ .setHeader("MyHeader").body()
+ .toF("undertow:http://localhost:%d", port);
+ }
+ }
+ );
+
+ context.start();
+
+ assertThat(
+ template.requestBody("direct:start", "h1", String.class)
+ ).isEqualTo("r1");
+ assertThatThrownBy(
+ () -> template.requestBody("direct:start", "t1", String.class)
+ ).isInstanceOf(CamelExecutionException.class).hasCauseExactlyInstanceOf(HttpOperationFailedException.class);
+ }
+
+ @Test
+ void testInvokeEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("undertow:http://0.0.0.0:%d", port)
+ .routeId("endpoint")
+ .setBody().simple("${routeId}")
+ .convertBodyTo(String.class)
+ .to("mock:endpoint");
+ from("direct:start")
+ .toF("knative-http:0.0.0.0:%d", port);
+ }
+ }
+ );
+
+ MockEndpoint mock = context.getEndpoint("mock:endpoint", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived("endpoint");
+ mock.expectedHeaderReceived("Host", "0.0.0.0");
+
+ context.start();
+
+ template.requestBody("direct:start", "1", String.class);
+
+ mock.assertIsSatisfied();
+ }
+}
+
diff --git a/camel-knative/pom.xml b/camel-knative/pom.xml
index 1dad133..50f6cd0 100644
--- a/camel-knative/pom.xml
+++ b/camel-knative/pom.xml
@@ -53,12 +53,7 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-netty4-http</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-undertow</artifactId>
+ <artifactId>camel-cloud</artifactId>
<scope>provided</scope>
</dependency>
@@ -142,6 +137,11 @@
<artifactId>camel-properties</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-undertow</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java b/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
index 646eb86..605ac14 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
+++ b/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
public final class Knative {
public static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new Jdk8Module());
+ public static final int DEFAULT_HTTP_PORT = 80;
+
public static final String HTTP_COMPONENT = "knative-http";
public static final String KNATIVE_PROTOCOL = "knative.protocol";
public static final String KNATIVE_TYPE = "knative.type";
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index cf10277..fa2405d 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -21,11 +21,13 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.IntrospectionSupport;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.util.StringHelper;
+@Component("knative")
public class KnativeComponent extends DefaultComponent {
public static final String CONFIGURATION_ENV_VARIABLE = "CAMEL_KNATIVE_CONFIGURATION";
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index dc12cd0..31db9ee 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -150,12 +150,13 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint
private static Endpoint http(CamelContext context, ServiceDefinition definition, Map<String, Object> transportOptions) {
try {
- final String scheme = Knative.HTTP_COMPONENT;
- final String protocol = definition.getMetadata().getOrDefault(Knative.KNATIVE_PROTOCOL, "http");
-
+ String scheme = Knative.HTTP_COMPONENT;
String host = definition.getHost();
int port = definition.getPort();
+ if (port == -1) {
+ port = Knative.DEFAULT_HTTP_PORT;
+ }
if (ObjectHelper.isEmpty(host)) {
String name = definition.getName();
String zone = definition.getMetadata().get(Knative.SERVICE_META_ZONE);
@@ -175,13 +176,8 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint
}
ObjectHelper.notNull(host, Knative.SERVICE_META_HOST);
- ObjectHelper.notNull(protocol, Knative.KNATIVE_PROTOCOL);
-
- String uri = String.format("%s:%s://%s", scheme, protocol, host);
- if (port != -1) {
- uri = uri + ":" + port;
- }
+ String uri = String.format("%s:%s:%s", scheme, host, port);
String path = definition.getMetadata().get(Knative.SERVICE_META_PATH);
if (path != null) {
if (!path.startsWith("/")) {
@@ -198,14 +194,9 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint
parameters.putAll(transportOptions);
if (ObjectHelper.isNotEmpty(filterKey) && ObjectHelper.isNotEmpty(filterVal)) {
- parameters.put("filter.headerName", filterKey);
- parameters.put("filter.headerValue", filterVal);
+ parameters.put("filter." + filterKey, filterVal);
}
- // configure netty to use relative path instead of full
- // path that is the default to make istio working
- //parameters.put("useRelativePath", "true");
-
uri = URISupport.appendParametersToURI(uri, parameters);
return context.getEndpoint(uri);
diff --git a/camel-knative/src/main/resources/META-INF/services/org/apache/camel/component/knative b/camel-knative/src/main/resources/META-INF/services/org/apache/camel/component/knative
deleted file mode 100644
index 3d68812..0000000
--- a/camel-knative/src/main/resources/META-INF/services/org/apache/camel/component/knative
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-class=org.apache.camel.component.knative.KnativeComponent
\ No newline at end of file
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
index 189c79d..1376c92 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
@@ -88,7 +88,7 @@ public class CloudEventsV01Test {
from("direct:source")
.to("knative:endpoint/myEndpoint");
- fromF("netty4-http:http://localhost:%d/a/path", port)
+ fromF("undertow:http://localhost:%d/a/path", port)
.to("mock:ce");
}
});
@@ -155,10 +155,10 @@ public class CloudEventsV01Test {
from("direct:source2")
.to("knative:endpoint/myEndpoint2?cloudEventsType=my.type");
- fromF("netty4-http:http://localhost:%d/", port)
+ fromF("undertow:http://localhost:%d/", port)
.to("mock:ce");
- fromF("netty4-http:http://localhost:%d/2", port)
+ fromF("undertow:http://localhost:%d/2", port)
.to("mock:ce2");
}
});
@@ -231,7 +231,7 @@ public class CloudEventsV01Test {
.to("mock:ce");
from("direct:source")
- .toF("netty4-http:http://localhost:%d/a/path", port);
+ .toF("undertow:http://localhost:%d/a/path", port);
}
});
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
index 54dc2d0..75bfc0f 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
@@ -88,7 +88,7 @@ public class CloudEventsV02Test {
from("direct:source")
.to("knative:endpoint/myEndpoint");
- fromF("netty4-http:http://localhost:%d/a/path", port)
+ fromF("undertow:http://localhost:%d/a/path", port)
.to("mock:ce");
}
});
@@ -155,10 +155,10 @@ public class CloudEventsV02Test {
from("direct:source2")
.to("knative:endpoint/myEndpoint2?cloudEventsType=my.type");
- fromF("netty4-http:http://localhost:%d/", port)
+ fromF("undertow:http://localhost:%d/", port)
.to("mock:ce");
- fromF("netty4-http:http://localhost:%d/2", port)
+ fromF("undertow:http://localhost:%d/2", port)
.to("mock:ce2");
}
});
@@ -231,7 +231,7 @@ public class CloudEventsV02Test {
.to("mock:ce");
from("direct:source")
- .toF("netty4-http:http://localhost:%d/a/path", port);
+ .toF("undertow:http://localhost:%d/a/path", port);
}
});
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
index a8a2c92..da81f6a 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
@@ -27,8 +27,8 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.knative.ce.CloudEventsProcessors;
+import org.apache.camel.component.knative.http.KnativeHttpEndpoint;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.netty4.NettyEndpoint;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultHeaderFilterStrategy;
@@ -140,8 +140,8 @@ public class KnativeComponentTest {
assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(NettyEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://my-node:9001/a/path");
+ assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
+ assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://my-node:9001/a/path");
//
// Endpoint with context path overridden by endpoint uri
@@ -157,8 +157,8 @@ public class KnativeComponentTest {
assertThat(e2.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
assertThat(e2.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
assertThat(e2.getService()).hasFieldOrPropertyWithValue("path", "/another/path");
- assertThat(e2.getEndpoint()).isInstanceOf(NettyEndpoint.class);
- assertThat(e2.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://my-node:9001/another/path");
+ assertThat(e2.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
+ assertThat(e2.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://my-node:9001/another/path");
}
@Test
@@ -190,8 +190,8 @@ public class KnativeComponentTest {
assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(NettyEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://myEndpoint/a/path");
+ assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
+ assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint:80/a/path");
}
@Test
@@ -225,8 +225,8 @@ public class KnativeComponentTest {
assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(NettyEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://myEndpoint.myNamespace/a/path");
+ assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
+ assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint.myNamespace:80/a/path");
}
@Test
@@ -266,8 +266,8 @@ public class KnativeComponentTest {
assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(NettyEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://myEndpoint.myNamespace/a/path");
+ assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
+ assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint.myNamespace:80/a/path");
}
@Test
@@ -291,8 +291,8 @@ public class KnativeComponentTest {
assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/my/path");
- assertThat(e1.getEndpoint()).isInstanceOf(NettyEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://myEndpoint/my/path");
+ assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
+ assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint:80/my/path");
//
// Endpoint with context path overridden by endpoint uri
@@ -306,8 +306,8 @@ public class KnativeComponentTest {
assertThat(e2.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.channel);
assertThat(e2.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
assertThat(e2.getService()).hasFieldOrPropertyWithValue("path", "/another/path");
- assertThat(e2.getEndpoint()).isInstanceOf(NettyEndpoint.class);
- assertThat(e2.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://myChannel-channel/another/path");
+ assertThat(e2.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
+ assertThat(e2.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myChannel-channel:80/another/path");
}
@@ -340,7 +340,7 @@ public class KnativeComponentTest {
from("direct:source")
.to("knative:endpoint/myEndpoint");
- fromF("netty4-http:http://localhost:%d/a/path", port)
+ fromF("undertow:http://localhost:%d/a/path", port)
.to("mock:ce");
}
});
@@ -396,7 +396,7 @@ public class KnativeComponentTest {
.to("mock:ce");
from("direct:source")
- .toF("netty4-http:http://localhost:%d/a/path", port);
+ .toF("undertow:http://localhost:%d/a/path", port);
}
});
@@ -545,7 +545,7 @@ public class KnativeComponentTest {
.setHeader(Exchange.HTTP_METHOD)
.constant("POST")
.setHeader(Exchange.HTTP_QUERY)
- .simple("filter.headerName=CE-Source&filter.headerValue=${header.FilterVal}")
+ .simple("filter.CE-Source=${header.FilterVal}")
.toD("http4://localhost:" + port);
}
});
@@ -629,7 +629,7 @@ public class KnativeComponentTest {
.to("knative:endpoint/myEndpoint")
.to("mock:source");
- fromF("netty4-http:http://localhost:%d", port)
+ fromF("undertow:http://localhost:%d", port)
.setBody().constant("test");
}
});
@@ -687,7 +687,7 @@ public class KnativeComponentTest {
.to("knative:endpoint/myEndpoint?transport.headerFilterStrategy=#myFilterStrategy")
.to("mock:source");
- fromF("netty4-http:http://localhost:%d?headerFilterStrategy=#myFilterStrategy", port)
+ fromF("undertow:http://localhost:%d?headerFilterStrategy=#myFilterStrategy", port)
.setBody().constant("test");
}
});