You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/03/21 05:32:26 UTC
[camel-k-runtime] branch master updated: fix: add custom netty http
binding to workaround CAMEL-13351
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new 654b2dd fix: add custom netty http binding to workaround CAMEL-13351
654b2dd is described below
commit 654b2dd3c649cf1f104ae7efbe4be38529d18051
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Wed Mar 20 14:47:31 2019 +0100
fix: add custom netty http binding to workaround CAMEL-13351
The issue will be fixed upstream by the #2832 on apache camel
repository but as we support multiple camel version we still
need to have this workaround to be sure the fix is applied.
---
.../java/org/apache/camel/k/adapter/Objects.java | 27 ++++
.../java/org/apache/camel/k/adapter/Objects.java | 27 ++++
.../knative/http/KnativeHttpComponent.java | 170 +++++++++++++++++++--
3 files changed, 212 insertions(+), 12 deletions(-)
diff --git a/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java b/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java
new file mode 100644
index 0000000..00e0bc5
--- /dev/null
+++ b/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.adapter;
+
+import java.util.Iterator;
+
+import org.apache.camel.util.ObjectHelper;
+
+public final class Objects {
+ public static Iterator<?> createIterator(Object value, String delimiter, boolean allowEmptyValues) {
+ return ObjectHelper.createIterator(value, delimiter, allowEmptyValues);
+ }
+}
diff --git a/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java b/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java
new file mode 100644
index 0000000..1911e4e
--- /dev/null
+++ b/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.adapter;
+
+import java.util.Iterator;
+
+import org.apache.camel.support.ObjectHelper;
+
+public final class Objects {
+ public static Iterator<?> createIterator(Object value, String delimiter, boolean allowEmptyValues) {
+ return ObjectHelper.createIterator(value, delimiter, allowEmptyValues);
+ }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
index d0567c7..301c181 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
@@ -18,29 +18,47 @@ package org.apache.camel.component.knative.http;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.netty4.NettyConverter;
+import org.apache.camel.component.netty4.http.DefaultNettyHttpBinding;
import org.apache.camel.component.netty4.http.HttpServerConsumerChannelFactory;
+import org.apache.camel.component.netty4.http.NettyHttpBinding;
import org.apache.camel.component.netty4.http.NettyHttpComponent;
+import org.apache.camel.component.netty4.http.NettyHttpConfiguration;
import org.apache.camel.component.netty4.http.NettyHttpConsumer;
+import org.apache.camel.component.netty4.http.NettyHttpHelper;
import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
import org.apache.camel.http.common.CamelServlet;
import org.apache.camel.k.adapter.Exceptions;
+import org.apache.camel.k.adapter.Objects;
import org.apache.camel.k.adapter.Services;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.RestConsumerContextPathMatcher;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
@@ -53,8 +71,14 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class KnativeHttpComponent extends NettyHttpComponent {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class);
private final Map<Integer, HttpServerConsumerChannelFactory> handlers = new ConcurrentHashMap<>();
+ public KnativeHttpComponent() {
+ super();
+ setNettyHttpBinding(new KnativeNettyHttpBinding(getHeaderFilterStrategy()));
+ }
+
@Override
public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) {
return handlers.computeIfAbsent(port, Handler::new);
@@ -68,6 +92,11 @@ public class KnativeHttpComponent extends NettyHttpComponent {
handlers.clear();
}
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ return super.createEndpoint(uri, remaining, parameters);
+ }
+
@ChannelHandler.Sharable
private static class Handler extends SimpleChannelInboundHandler<Object> implements HttpServerConsumerChannelFactory {
private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
@@ -190,18 +219,6 @@ public class KnativeHttpComponent extends NettyHttpComponent {
// use the path as key to find the consumer handler to use
path = pathAsKey(path);
- /*
- List<RestConsumerContextPathMatcher.ConsumerPath> paths = new ArrayList<>();
- for (final HttpServerChannelHandler handler : consumers) {
- paths.add(new HttpRestConsumerPath(handler));
- }
-
- RestConsumerContextPathMatcher.ConsumerPath<HttpServerChannelHandler> best = RestConsumerContextPathMatcher.matchBestPath(method, path, paths);
- if (best != null) {
- answer = best.getConsumer();
- }
- */
-
// fallback to regular matching
if (answer == null) {
for (final HttpServerChannelHandler handler : consumers) {
@@ -261,4 +278,133 @@ public class KnativeHttpComponent extends NettyHttpComponent {
}
}
+
+
+ /**
+ * Default {@link NettyHttpBinding}.
+ */
+ public class KnativeNettyHttpBinding extends DefaultNettyHttpBinding {
+ public KnativeNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
+ super(headerFilterStrategy);
+ }
+
+ @Override
+ public HttpRequest toNettyRequest(Message message, String uri, NettyHttpConfiguration configuration) throws Exception {
+ LOGGER.trace("toNettyRequest: {}", message);
+
+ // the message body may already be a Netty HTTP response
+ if (message.getBody() instanceof HttpRequest) {
+ return (HttpRequest) message.getBody();
+ }
+
+ String uriForRequest = uri;
+ if (configuration.isUseRelativePath()) {
+ uriForRequest = URISupport.pathAndQueryOf(new URI(uriForRequest));
+ }
+
+ // just assume GET for now, we will later change that to the actual method to use
+ HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriForRequest);
+
+ Object body = message.getBody();
+ if (body != null) {
+ // support bodies as native Netty
+ ByteBuf buffer;
+ if (body instanceof ByteBuf) {
+ buffer = (ByteBuf) body;
+ } else {
+ // try to convert to buffer first
+ buffer = message.getBody(ByteBuf.class);
+ if (buffer == null) {
+ // fallback to byte array as last resort
+ byte[] data = message.getMandatoryBody(byte[].class);
+ buffer = NettyConverter.toByteBuffer(data);
+ }
+ }
+ if (buffer != null) {
+ request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriForRequest, buffer);
+ int len = buffer.readableBytes();
+ // set content-length
+ request.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len);
+ LOGGER.trace("Content-Length: {}", len);
+ } else {
+ // we do not support this kind of body
+ throw new NoTypeConversionAvailableException(body, ByteBuf.class);
+ }
+ }
+
+ // update HTTP method accordingly as we know if we have a body or not
+ HttpMethod method = NettyHttpHelper.createMethod(message, body != null);
+ request.setMethod(method);
+
+ TypeConverter tc = message.getExchange().getContext().getTypeConverter();
+
+ // if we bridge endpoint then we need to skip matching headers with the HTTP_QUERY to avoid sending
+ // duplicated headers to the receiver, so use this skipRequestHeaders as the list of headers to skip
+ Map<String, Object> skipRequestHeaders = null;
+ if (configuration.isBridgeEndpoint()) {
+ String queryString = message.getHeader(Exchange.HTTP_QUERY, String.class);
+ if (queryString != null) {
+ skipRequestHeaders = URISupport.parseQuery(queryString, false, true);
+ }
+ // Need to remove the Host key as it should be not used
+ message.getHeaders().remove("host");
+ }
+
+ // append headers
+ // must use entrySet to ensure case of keys is preserved
+ for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+
+ // we should not add headers for the parameters in the uri if we bridge the endpoint
+ // as then we would duplicate headers on both the endpoint uri, and in HTTP headers as well
+ if (skipRequestHeaders != null && skipRequestHeaders.containsKey(key)) {
+ continue;
+ }
+
+ // use an iterator as there can be multiple values. (must not use a delimiter)
+ final Iterator<?> it = Objects.createIterator(value, null, true);
+ while (it.hasNext()) {
+ String headerValue = tc.convertTo(String.class, it.next());
+
+ if (headerValue != null && getHeaderFilterStrategy() != null
+ && !getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
+ LOGGER.trace("HTTP-Header: {}={}", key, headerValue);
+ request.headers().add(key, headerValue);
+ }
+ }
+ }
+
+ // set the content type in the response.
+ String contentType = message.getHeader(Exchange.CONTENT_TYPE, String.class);
+ if (contentType != null) {
+ // set content-type
+ request.headers().set(HttpHeaderNames.CONTENT_TYPE.toString(), contentType);
+ LOGGER.trace("Content-Type: {}", contentType);
+ }
+
+ // must include HOST header as required by HTTP 1.1
+ // use URI as its faster than URL (no DNS lookup)
+ URI u = new URI(uri);
+ String hostHeader = u.getHost() + (u.getPort() == 80 ? "" : ":" + u.getPort());
+ request.headers().set(HttpHeaderNames.HOST.toString(), hostHeader);
+ LOGGER.trace("Host: {}", hostHeader);
+
+ // configure connection to accordingly to keep alive configuration
+ // favor using the header from the message
+ String connection = message.getHeader(HttpHeaderNames.CONNECTION.toString(), String.class);
+ if (connection == null) {
+ // fallback and use the keep alive from the configuration
+ if (configuration.isKeepAlive()) {
+ connection = HttpHeaderValues.KEEP_ALIVE.toString();
+ } else {
+ connection = HttpHeaderValues.CLOSE.toString();
+ }
+ }
+ request.headers().set(HttpHeaderNames.CONNECTION.toString(), connection);
+ LOGGER.trace("Connection: {}", connection);
+
+ return request;
+ }
+ }
}