You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by GitBox <gi...@apache.org> on 2018/11/22 10:49:02 UTC

[GitHub] nicolaferraro closed pull request #234: runtime: add a netty4-http based knative component

nicolaferraro closed pull request #234: runtime: add a netty4-http based knative component
URL: https://github.com/apache/camel-k/pull/234
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runtime/camel-knative-http/pom.xml b/runtime/camel-knative-http/pom.xml
new file mode 100644
index 00000000..b20b8955
--- /dev/null
+++ b/runtime/camel-knative-http/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.k</groupId>
+        <artifactId>camel-k-runtime-parent</artifactId>
+        <version>0.0.6-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>camel-knative-http</artifactId>
+
+    <dependencies>
+
+        <!-- ****************************** -->
+        <!--                                -->
+        <!-- RUNTIME                        -->
+        <!--                                -->
+        <!-- ****************************** -->
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-netty4-http</artifactId>
+        </dependency>
+
+        <!-- ****************************** -->
+        <!--                                -->
+        <!-- TESTS                          -->
+        <!--                                -->
+        <!-- ****************************** -->
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>${junit-jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>${junit-jupiter.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j2.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j2.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git a/runtime/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java b/runtime/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
new file mode 100644
index 00000000..82fd12b0
--- /dev/null
+++ b/runtime/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+public final class KnativeHttp {
+    private KnativeHttp() {
+    }
+}
diff --git a/runtime/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/runtime/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
new file mode 100644
index 00000000..72871225
--- /dev/null
+++ b/runtime/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.net.URI;
+import java.nio.channels.ClosedChannelException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.netty4.http.HttpServerConsumerChannelFactory;
+import org.apache.camel.component.netty4.http.NettyHttpComponent;
+import org.apache.camel.component.netty4.http.NettyHttpConsumer;
+import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
+import org.apache.camel.http.common.CamelServlet;
+import org.apache.camel.support.RestConsumerContextPathMatcher;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.URISupport;
+import org.apache.camel.util.UnsafeUriCharactersEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class KnativeHttpComponent extends NettyHttpComponent {
+    private final Map<Integer, HttpServerConsumerChannelFactory> handlers = new ConcurrentHashMap<>();
+
+    @Override
+    public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) {
+        return handlers.computeIfAbsent(port, Handler::new);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        ServiceHelper.stopService(handlers.values());
+        handlers.clear();
+    }
+
+    @ChannelHandler.Sharable
+    private static class Handler extends SimpleChannelInboundHandler<Object> implements HttpServerConsumerChannelFactory {
+        private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
+        private static final AttributeKey<HttpServerChannelHandler> SERVER_HANDLER_KEY = AttributeKey.valueOf("serverHandler");
+
+        private final Set<HttpServerChannelHandler> consumers;
+        private final int port;
+        private final String token;
+        private final int len;
+
+        public Handler(int port) {
+            this.consumers = new CopyOnWriteArraySet<>();
+            this.port = port;
+            this.token = ":" + port;
+            this.len = token.length();
+        }
+
+        public void init(int port) {
+        }
+
+        public void addConsumer(NettyHttpConsumer consumer) {
+            consumers.add(new HttpServerChannelHandler(consumer));
+        }
+
+        public void removeConsumer(NettyHttpConsumer consumer) {
+            consumers.removeIf(h -> h.getConsumer() == consumer);
+        }
+
+        public int consumers() {
+            return consumers.size();
+        }
+
+        public int getPort() {
+            return port;
+        }
+
+        public ChannelHandler getChannelHandler() {
+            return this;
+        }
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+            // store request, as this channel handler is created per pipeline
+            HttpRequest request = (HttpRequest) msg;
+
+            LOG.debug("Message received: {}", request);
+
+            HttpServerChannelHandler handler = getHandler(request, request.method().name());
+            if (handler != null) {
+                Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY);
+                // store handler as attachment
+                attr.set(handler);
+                if (msg instanceof HttpContent) {
+                    // need to hold the reference of content
+                    HttpContent httpContent = (HttpContent) msg;
+                    httpContent.content().retain();
+                }
+                handler.channelRead(ctx, request);
+            } else {
+                // okay we cannot process this requires so return either 404 or 405.
+                // to know if its 405 then we need to check if any other HTTP method would have a consumer for the "same" request
+                boolean hasAnyMethod = CamelServlet.METHODS.stream().anyMatch(m -> isHttpMethodAllowed(request, m));
+                HttpResponse response = null;
+                if (hasAnyMethod) {
+                    //method match error, return 405
+                    response = new DefaultHttpResponse(HTTP_1_1, METHOD_NOT_ALLOWED);
+                } else {
+                    // this resource is not found, return 404
+                    response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
+                }
+                response.headers().set(Exchange.CONTENT_TYPE, "text/plain");
+                response.headers().set(Exchange.CONTENT_LENGTH, 0);
+                ctx.writeAndFlush(response);
+                ctx.close();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY);
+            HttpServerChannelHandler handler = attr.get();
+            if (handler != null) {
+                handler.exceptionCaught(ctx, cause);
+            } else {
+                if (cause instanceof ClosedChannelException) {
+                    // The channel is closed so we do nothing here
+                    LOG.debug("Channel already closed. Ignoring this exception.");
+                    return;
+                } else {
+                    // we cannot throw the exception here
+                    LOG.warn("HttpServerChannelHandler is not found as attachment to handle exception, send 404 back to the client.", cause);
+                    // Now we just send 404 back to the client
+                    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
+                    response.headers().set(Exchange.CONTENT_TYPE, "text/plain");
+                    response.headers().set(Exchange.CONTENT_LENGTH, 0);
+                    ctx.writeAndFlush(response);
+                    ctx.close();
+                }
+            }
+        }
+
+        private boolean isHttpMethodAllowed(HttpRequest request, String method) {
+            return getHandler(request, method) != null;
+        }
+
+        @SuppressWarnings("unchecked")
+        private HttpServerChannelHandler getHandler(HttpRequest request, String method)  {
+            HttpServerChannelHandler answer = null;
+
+            // need to strip out host and port etc, as we only need the context-path for matching
+            if (method == null) {
+                return null;
+            }
+
+            String path = request.uri();
+            int idx = path.indexOf(token);
+            if (idx > -1) {
+                path = path.substring(idx + len);
+            }
+            // use the path as key to find the consumer handler to use
+            path = pathAsKey(path);
+
+            /*
+            List<RestConsumerContextPathMatcher.ConsumerPath> paths = new ArrayList<>();
+            for (final HttpServerChannelHandler handler : consumers) {
+                paths.add(new HttpRestConsumerPath(handler));
+            }
+
+            RestConsumerContextPathMatcher.ConsumerPath<HttpServerChannelHandler> best = RestConsumerContextPathMatcher.matchBestPath(method, path, paths);
+            if (best != null) {
+                answer = best.getConsumer();
+            }
+            */
+
+            // fallback to regular matching
+            if (answer == null) {
+                for (final HttpServerChannelHandler handler : consumers) {
+                    try {
+                        final NettyHttpConsumer consumer = handler.getConsumer();
+                        final HttpHeaders headers = request.headers();
+                        final String uri = consumer.getEndpoint().getEndpointUri();
+                        final Map<String, Object> params = URISupport.parseParameters(URI.create(uri));
+
+                        if (params.containsKey("filter.headerName") && params.containsKey("filter.headerValue")) {
+                            final String filterKey = (String) params.get("filter.headerName");
+                            final String filterVal = (String) params.get("filter.headerValue");
+                            final String headerVal = headers.get(filterKey);
+
+                            if (ObjectHelper.isEmpty(headerVal)) {
+                                continue;
+                            }
+                            if (!ObjectHelper.equal(filterVal, headerVal)) {
+                                continue;
+                            }
+                        }
+
+                        String consumerPath = consumer.getConfiguration().getPath();
+                        boolean matchOnUriPrefix = consumer.getEndpoint().getConfiguration().isMatchOnUriPrefix();
+                        // Just make sure the we get the right consumer path first
+                        if (RestConsumerContextPathMatcher.matchPath(path, consumerPath, matchOnUriPrefix)) {
+                            answer = handler;
+                            break;
+                        }
+                    } catch (Exception e) {
+                        throw ObjectHelper.wrapRuntimeCamelException(e);
+                    }
+                }
+            }
+
+            return answer;
+        }
+
+        private static String pathAsKey(String path) {
+            // cater for default path
+            if (path == null || path.equals("/")) {
+                path = "";
+            }
+
+            // strip out query parameters
+            int idx = path.indexOf('?');
+            if (idx > -1) {
+                path = path.substring(0, idx);
+            }
+
+            // strip of ending /
+            if (path.endsWith("/")) {
+                path = path.substring(0, path.length() - 1);
+            }
+
+            return UnsafeUriCharactersEncoder.encodeHttpURI(path);
+        }
+
+    }
+}
diff --git a/runtime/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/component/knative-http b/runtime/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/component/knative-http
new file mode 100644
index 00000000..439d7643
--- /dev/null
+++ b/runtime/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/component/knative-http
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.knative.http.KnativeHttpComponent
\ No newline at end of file
diff --git a/runtime/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpMain.java b/runtime/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpMain.java
new file mode 100644
index 00000000..ba4099ff
--- /dev/null
+++ b/runtime/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpMain.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+public class KnativeHttpMain {
+    public static void main(String[] args) throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        DefaultCamelContext context = new DefaultCamelContext(registry);
+
+        try {
+            context.disableJMX();
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("knative-http:http://0.0.0.0:8080?filter.headerName=CE-Source&filter.headerValue=CH1")
+                        .convertBodyTo(String.class)
+                        .to("log:ch-11?showAll=true&multiline=true")
+                        .setBody().constant("Hello from CH1");
+                    from("knative-http:http://0.0.0.0:8080?filter.headerName=CE-Source&filter.headerValue=CH2")
+                        .convertBodyTo(String.class)
+                        .to("log:ch-2?showAll=true&multiline=true")
+                        .setBody().constant("Hello from CH2");
+                }
+            });
+
+            context.start();
+
+            Thread.sleep(Integer.MAX_VALUE);
+        } finally {
+            context.stop();
+        }
+
+    }
+}
diff --git a/runtime/camel-knative-http/src/test/resources/log4j2-test.xml b/runtime/camel-knative-http/src/test/resources/log4j2-test.xml
new file mode 100644
index 00000000..403c946e
--- /dev/null
+++ b/runtime/camel-knative-http/src/test/resources/log4j2-test.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="INFO">
+  <Appenders>
+    <Console name="STDOUT" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}|%-5level|%t|%c{1} - %msg%n"/>
+    </Console>
+    <Null name="NONE"/>
+  </Appenders>
+
+  <Loggers>
+    <Root level="INFO">
+      <AppenderRef ref="STDOUT"/>
+      <!--
+      <AppenderRef ref="NONE"/>
+      -->
+    </Root>
+  </Loggers>
+
+</Configuration>
\ No newline at end of file
diff --git a/runtime/camel-knative/pom.xml b/runtime/camel-knative/pom.xml
index e03c009f..dd035d18 100644
--- a/runtime/camel-knative/pom.xml
+++ b/runtime/camel-knative/pom.xml
@@ -48,8 +48,9 @@
             <artifactId>camel-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-undertow</artifactId>
+            <groupId>org.apache.camel.k</groupId>
+            <artifactId>camel-knative-http</artifactId>
+            <version>${project.version}</version>
         </dependency>
 
         <dependency>
@@ -91,6 +92,11 @@
             </exclusions>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-http4</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.junit.jupiter</groupId>
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
index 733555fe..4ea19d48 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
@@ -22,10 +22,12 @@
 public final class Knative {
     public static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new Jdk8Module());
 
-    public static final String HTTP_COMPONENT = "undertow";
+    public static final String HTTP_COMPONENT = "knative-http";
     public static final String KNATIVE_PROTOCOL = "knative.protocol";
     public static final String KNATIVE_TYPE = "knative.type";
     public static final String KNATIVE_EVENT_TYPE = "knative.event.type";
+    public static final String FILTER_HEADER_NAME = "filter.header.name";
+    public static final String FILTER_HEADER_VALUE = "filter.header.value";
     public static final String CONTENT_TYPE = "content.type";
     public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json";
 
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 1b94484a..eb3a1fdf 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -37,9 +37,11 @@
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.CollectionHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.URISupport;
 import org.apache.commons.lang3.StringUtils;
 
 import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
@@ -223,6 +225,16 @@ private static Endpoint http(CamelContext context, ServiceDefinition definition)
                 uri += path;
             }
 
+            final String filterKey = definition.getMetadata().get(Knative.FILTER_HEADER_NAME);
+            final String filterVal = definition.getMetadata().get(Knative.FILTER_HEADER_VALUE);
+
+            if (ObjectHelper.isNotEmpty(filterKey) && ObjectHelper.isNotEmpty(filterVal)) {
+                uri = URISupport.appendParametersToURI(
+                    uri,
+                    CollectionHelper.mapOf("filter.headerName", filterKey, "filter.headerValue", filterVal)
+                );
+            }
+
             return context.getEndpoint(uri);
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEnvironment.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEnvironment.java
index 86758e22..9ec4406d 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEnvironment.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEnvironment.java
@@ -121,7 +121,9 @@ public static KnativeEnvironment mandatoryLoadFromResource(CamelContext context,
             //              "port": "",
             //              "metadata": {
             //                  "service.path": "",
-            //                  "knative.event.type": ""
+            //                  "knative.event.type": "",
+            //                  "filter.header.name": "",
+            //                  "filter.header.value": ""
             //              }
             //         },
             //     ]
diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentMain.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentMain.java
new file mode 100644
index 00000000..b9836956
--- /dev/null
+++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentMain.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import java.util.Arrays;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+import static org.apache.camel.util.CollectionHelper.mapOf;
+
+public class KnativeComponentMain {
+    public static void main(String[] args) throws Exception {
+        KnativeComponent component = new KnativeComponent();
+        component.setEnvironment(newEnv());
+
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("knative", component);
+
+        DefaultCamelContext context = new DefaultCamelContext(registry);
+
+        try {
+            context.disableJMX();
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("knative:endpoint/ep1")
+                        .convertBodyTo(String.class)
+                        .to("log:ep1?showAll=true&multiline=true")
+                        .setBody().constant("Hello from CH1");
+                    from("knative:endpoint/ep2")
+                        .convertBodyTo(String.class)
+                        .to("log:ep2?showAll=true&multiline=true")
+                        .setBody().constant("Hello from CH2");
+                }
+            });
+
+            context.start();
+
+            Thread.sleep(Integer.MAX_VALUE);
+        } finally {
+            context.stop();
+        }
+    }
+
+    private static KnativeEnvironment newEnv() {
+        return new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep1",
+                "localhost",
+                8080,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "CE-Source",
+                    Knative.FILTER_HEADER_VALUE, "CE1"
+                )),
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep2",
+                "localhost",
+                8080,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "CE-Source",
+                    Knative.FILTER_HEADER_VALUE, "CE2"
+                ))
+        ));
+    }
+}
diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
index ecc265a8..631e95b6 100644
--- a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
+++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
@@ -26,7 +26,7 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.cloud.ServiceDefinition;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.undertow.UndertowEndpoint;
+import org.apache.camel.component.netty4.NettyEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.test.AvailablePortFinder;
 import org.junit.jupiter.api.AfterEach;
@@ -127,7 +127,7 @@ void testCreateEndpoint() throws Exception {
         assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
         assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
         assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
-        assertThat(e1.getEndpoint()).isInstanceOf(UndertowEndpoint.class);
+        assertThat(e1.getEndpoint()).isInstanceOf(NettyEndpoint.class);
         assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://my-node:9001/a/path");
 
         //
@@ -144,7 +144,7 @@ void testCreateEndpoint() throws Exception {
         assertThat(e2.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
         assertThat(e2.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
         assertThat(e2.getService()).hasFieldOrPropertyWithValue("path", "/another/path");
-        assertThat(e2.getEndpoint()).isInstanceOf(UndertowEndpoint.class);
+        assertThat(e2.getEndpoint()).isInstanceOf(NettyEndpoint.class);
         assertThat(e2.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "http://my-node:9001/another/path");
     }
 
@@ -175,7 +175,7 @@ public void configure() throws Exception {
                 from("direct:source")
                     .to("knative:endpoint/myEndpoint");
 
-                fromF("undertow:http://localhost:%d/a/path", port)
+                fromF("netty4-http:http://localhost:%d/a/path", port)
                     .to("mock:ce");
             }
         });
@@ -230,7 +230,7 @@ public void configure() throws Exception {
                     .to("mock:ce");
 
                 from("direct:source")
-                    .toF("undertow:http://localhost:%d/a/path", port);
+                    .toF("netty4-http:http://localhost:%d/a/path", port);
             }
         });
 
@@ -293,7 +293,7 @@ public void configure() throws Exception {
                     .to("mock:ce");
 
                 from("direct:source")
-                    .toF("undertow:http://localhost:%d/a/path", port);
+                    .toF("http4://localhost:%d/a/path", port);
             }
         });
 
@@ -324,4 +324,108 @@ public void configure() throws Exception {
 
         mock.assertIsSatisfied();
     }
+
+    @Test
+    void testConsumeContentFithFilter() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep1",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "CE-Source",
+                    Knative.FILTER_HEADER_VALUE, "CE1"
+                )),
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep2",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "CE-Source",
+                    Knative.FILTER_HEADER_VALUE, "CE2"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/ep1")
+                    .convertBodyTo(String.class)
+                    .to("log:ce1?showAll=true&multiline=true")
+                    .to("mock:ce1");
+                from("knative:endpoint/ep2")
+                    .convertBodyTo(String.class)
+                    .to("log:ce2?showAll=true&multiline=true")
+                    .to("mock:ce2");
+
+                from("direct:source")
+                    .setBody()
+                        .constant("test")
+                    .setHeader(Exchange.HTTP_METHOD)
+                        .constant("POST")
+                    .setHeader(Exchange.HTTP_QUERY)
+                        .simple("filter.headerName=CE-Source&filter.headerValue=${header.FilterVal}")
+                    .toD("http4://localhost:" + port);
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+        mock1.expectedMessageCount(1);
+        mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+        mock1.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
+        mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+        mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
+        mock1.expectedHeaderReceived("CE-Source", "CE1");
+        mock1.expectedBodiesReceived("test");
+
+        MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+        mock2.expectedMessageCount(1);
+        mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+        mock2.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
+        mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+        mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
+        mock2.expectedHeaderReceived("CE-Source", "CE2");
+        mock2.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader("FilterVal", "CE1");
+                e.getIn().setHeader("CE-CloudEventsVersion", "0.1");
+                e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+                e.getIn().setHeader("CE-EventID", "myEventID1");
+                e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("CE-Source", "CE1");
+            }
+        );
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader("FilterVal", "CE2");
+                e.getIn().setHeader("CE-CloudEventsVersion", "0.1");
+                e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+                e.getIn().setHeader("CE-EventID", "myEventID2");
+                e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("CE-Source", "CE2");
+            }
+        );
+
+        mock1.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+    }
 }
diff --git a/runtime/camel-knative/src/test/resources/log4j2-test.xml b/runtime/camel-knative/src/test/resources/log4j2-test.xml
index 9af8521c..403c946e 100644
--- a/runtime/camel-knative/src/test/resources/log4j2-test.xml
+++ b/runtime/camel-knative/src/test/resources/log4j2-test.xml
@@ -9,8 +9,10 @@
 
   <Loggers>
     <Root level="INFO">
-      <!--<AppenderRef ref="STDOUT"/>-->
+      <AppenderRef ref="STDOUT"/>
+      <!--
       <AppenderRef ref="NONE"/>
+      -->
     </Root>
   </Loggers>
 
diff --git a/runtime/pom.xml b/runtime/pom.xml
index 6f903801..3ddf47c8 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -102,6 +102,7 @@
         <module>kotlin</module>
         <module>catalog-builder</module>
         <module>dependency-lister</module>
+        <module>camel-knative-http</module>
         <module>camel-knative</module>
     </modules>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services