You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/03/21 05:32:26 UTC

[camel-k-runtime] branch master updated: fix: add custom netty http binding to workaround CAMEL-13351

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new 654b2dd  fix: add custom netty http binding to workaround CAMEL-13351
654b2dd is described below

commit 654b2dd3c649cf1f104ae7efbe4be38529d18051
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Wed Mar 20 14:47:31 2019 +0100

    fix: add custom netty http binding to workaround CAMEL-13351
    
    The issue will be fixed upstream by the #2832 on apache camel
    repository but as we support multiple camel version we still
    need to have this workaround to be sure the fix is applied.
---
 .../java/org/apache/camel/k/adapter/Objects.java   |  27 ++++
 .../java/org/apache/camel/k/adapter/Objects.java   |  27 ++++
 .../knative/http/KnativeHttpComponent.java         | 170 +++++++++++++++++++--
 3 files changed, 212 insertions(+), 12 deletions(-)

diff --git a/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java b/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java
new file mode 100644
index 0000000..00e0bc5
--- /dev/null
+++ b/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.adapter;
+
+import java.util.Iterator;
+
+import org.apache.camel.util.ObjectHelper;
+
+public final class Objects {
+    public static Iterator<?> createIterator(Object value, String delimiter, boolean allowEmptyValues) {
+        return ObjectHelper.createIterator(value, delimiter, allowEmptyValues);
+    }
+}
diff --git a/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java b/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java
new file mode 100644
index 0000000..1911e4e
--- /dev/null
+++ b/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.adapter;
+
+import java.util.Iterator;
+
+import org.apache.camel.support.ObjectHelper;
+
+public final class Objects {
+    public static Iterator<?> createIterator(Object value, String delimiter, boolean allowEmptyValues) {
+        return ObjectHelper.createIterator(value, delimiter, allowEmptyValues);
+    }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
index d0567c7..301c181 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
@@ -18,29 +18,47 @@ package org.apache.camel.component.knative.http;
 
 import java.net.URI;
 import java.nio.channels.ClosedChannelException;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpVersion;
 import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.netty4.NettyConverter;
+import org.apache.camel.component.netty4.http.DefaultNettyHttpBinding;
 import org.apache.camel.component.netty4.http.HttpServerConsumerChannelFactory;
+import org.apache.camel.component.netty4.http.NettyHttpBinding;
 import org.apache.camel.component.netty4.http.NettyHttpComponent;
+import org.apache.camel.component.netty4.http.NettyHttpConfiguration;
 import org.apache.camel.component.netty4.http.NettyHttpConsumer;
+import org.apache.camel.component.netty4.http.NettyHttpHelper;
 import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
 import org.apache.camel.http.common.CamelServlet;
 import org.apache.camel.k.adapter.Exceptions;
+import org.apache.camel.k.adapter.Objects;
 import org.apache.camel.k.adapter.Services;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.RestConsumerContextPathMatcher;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
@@ -53,8 +71,14 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 public class KnativeHttpComponent extends NettyHttpComponent {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class);
     private final Map<Integer, HttpServerConsumerChannelFactory> handlers = new ConcurrentHashMap<>();
 
+    public KnativeHttpComponent() {
+        super();
+        setNettyHttpBinding(new KnativeNettyHttpBinding(getHeaderFilterStrategy()));
+    }
+
     @Override
     public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) {
         return handlers.computeIfAbsent(port, Handler::new);
@@ -68,6 +92,11 @@ public class KnativeHttpComponent extends NettyHttpComponent {
         handlers.clear();
     }
 
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        return super.createEndpoint(uri, remaining, parameters);
+    }
+
     @ChannelHandler.Sharable
     private static class Handler extends SimpleChannelInboundHandler<Object> implements HttpServerConsumerChannelFactory {
         private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
@@ -190,18 +219,6 @@ public class KnativeHttpComponent extends NettyHttpComponent {
             // use the path as key to find the consumer handler to use
             path = pathAsKey(path);
 
-            /*
-            List<RestConsumerContextPathMatcher.ConsumerPath> paths = new ArrayList<>();
-            for (final HttpServerChannelHandler handler : consumers) {
-                paths.add(new HttpRestConsumerPath(handler));
-            }
-
-            RestConsumerContextPathMatcher.ConsumerPath<HttpServerChannelHandler> best = RestConsumerContextPathMatcher.matchBestPath(method, path, paths);
-            if (best != null) {
-                answer = best.getConsumer();
-            }
-            */
-
             // fallback to regular matching
             if (answer == null) {
                 for (final HttpServerChannelHandler handler : consumers) {
@@ -261,4 +278,133 @@ public class KnativeHttpComponent extends NettyHttpComponent {
         }
 
     }
+
+
+    /**
+     * Default {@link NettyHttpBinding}.
+     */
+    public class KnativeNettyHttpBinding extends DefaultNettyHttpBinding {
+        public KnativeNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
+            super(headerFilterStrategy);
+        }
+
+        @Override
+        public HttpRequest toNettyRequest(Message message, String uri, NettyHttpConfiguration configuration) throws Exception {
+            LOGGER.trace("toNettyRequest: {}", message);
+
+            // the message body may already be a Netty HTTP response
+            if (message.getBody() instanceof HttpRequest) {
+                return (HttpRequest) message.getBody();
+            }
+
+            String uriForRequest = uri;
+            if (configuration.isUseRelativePath()) {
+                uriForRequest = URISupport.pathAndQueryOf(new URI(uriForRequest));
+            }
+
+            // just assume GET for now, we will later change that to the actual method to use
+            HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriForRequest);
+
+            Object body = message.getBody();
+            if (body != null) {
+                // support bodies as native Netty
+                ByteBuf buffer;
+                if (body instanceof ByteBuf) {
+                    buffer = (ByteBuf) body;
+                } else {
+                    // try to convert to buffer first
+                    buffer = message.getBody(ByteBuf.class);
+                    if (buffer == null) {
+                        // fallback to byte array as last resort
+                        byte[] data = message.getMandatoryBody(byte[].class);
+                        buffer = NettyConverter.toByteBuffer(data);
+                    }
+                }
+                if (buffer != null) {
+                    request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriForRequest, buffer);
+                    int len = buffer.readableBytes();
+                    // set content-length
+                    request.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len);
+                    LOGGER.trace("Content-Length: {}", len);
+                } else {
+                    // we do not support this kind of body
+                    throw new NoTypeConversionAvailableException(body, ByteBuf.class);
+                }
+            }
+
+            // update HTTP method accordingly as we know if we have a body or not
+            HttpMethod method = NettyHttpHelper.createMethod(message, body != null);
+            request.setMethod(method);
+
+            TypeConverter tc = message.getExchange().getContext().getTypeConverter();
+
+            // if we bridge endpoint then we need to skip matching headers with the HTTP_QUERY to avoid sending
+            // duplicated headers to the receiver, so use this skipRequestHeaders as the list of headers to skip
+            Map<String, Object> skipRequestHeaders = null;
+            if (configuration.isBridgeEndpoint()) {
+                String queryString = message.getHeader(Exchange.HTTP_QUERY, String.class);
+                if (queryString != null) {
+                    skipRequestHeaders = URISupport.parseQuery(queryString, false, true);
+                }
+                // Need to remove the Host key as it should be not used
+                message.getHeaders().remove("host");
+            }
+
+            // append headers
+            // must use entrySet to ensure case of keys is preserved
+            for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+                String key = entry.getKey();
+                Object value = entry.getValue();
+
+                // we should not add headers for the parameters in the uri if we bridge the endpoint
+                // as then we would duplicate headers on both the endpoint uri, and in HTTP headers as well
+                if (skipRequestHeaders != null && skipRequestHeaders.containsKey(key)) {
+                    continue;
+                }
+
+                // use an iterator as there can be multiple values. (must not use a delimiter)
+                final Iterator<?> it = Objects.createIterator(value, null, true);
+                while (it.hasNext()) {
+                    String headerValue = tc.convertTo(String.class, it.next());
+
+                    if (headerValue != null && getHeaderFilterStrategy() != null
+                        && !getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
+                        LOGGER.trace("HTTP-Header: {}={}", key, headerValue);
+                        request.headers().add(key, headerValue);
+                    }
+                }
+            }
+
+            // set the content type in the response.
+            String contentType = message.getHeader(Exchange.CONTENT_TYPE, String.class);
+            if (contentType != null) {
+                // set content-type
+                request.headers().set(HttpHeaderNames.CONTENT_TYPE.toString(), contentType);
+                LOGGER.trace("Content-Type: {}", contentType);
+            }
+
+            // must include HOST header as required by HTTP 1.1
+            // use URI as its faster than URL (no DNS lookup)
+            URI u = new URI(uri);
+            String hostHeader = u.getHost() + (u.getPort() == 80 ? "" : ":" + u.getPort());
+            request.headers().set(HttpHeaderNames.HOST.toString(), hostHeader);
+            LOGGER.trace("Host: {}", hostHeader);
+
+            // configure connection to accordingly to keep alive configuration
+            // favor using the header from the message
+            String connection = message.getHeader(HttpHeaderNames.CONNECTION.toString(), String.class);
+            if (connection == null) {
+                // fallback and use the keep alive from the configuration
+                if (configuration.isKeepAlive()) {
+                    connection = HttpHeaderValues.KEEP_ALIVE.toString();
+                } else {
+                    connection = HttpHeaderValues.CLOSE.toString();
+                }
+            }
+            request.headers().set(HttpHeaderNames.CONNECTION.toString(), connection);
+            LOGGER.trace("Connection: {}", connection);
+
+            return request;
+        }
+    }
 }