You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/10/14 13:27:29 UTC
[camel-k-runtime] branch master updated: Support Knative
broker/trigger model #151
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new be1557b Support Knative broker/trigger model #151
new a0b31d2 Merge pull request #153 from lburgazzoli/github-151
be1557b is described below
commit be1557b17fc3f4c473ba23ceaeafa87108d10a94
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 9 16:51:44 2019 +0200
Support Knative broker/trigger model #151
---
camel-k-loader-knative/pom.xml | 5 +
.../knative/KnativeSourceRoutesLoaderTest.java | 7 +-
.../knative/deployment/DeploymentProcessor.java | 2 +-
camel-k-runtime-bom/pom.xml | 5 +
.../k/knative/yaml/parser/KnativeStepParser.java | 2 +-
.../knative/http/KnativeHttpEndpoint.java | 144 ---
.../component/knative/http/KnativeHttpTest.java | 273 ------
.../camel-knative-api}/pom.xml | 106 +-
.../camel/component/knative/spi/CloudEvent.java | 20 +-
.../knative/spi/CloudEventTypeConverter.java | 20 +-
.../component/knative/spi/CloudEventV01.java} | 51 +-
.../component/knative/spi/CloudEventV02.java} | 51 +-
.../camel/component/knative/spi/CloudEvents.java} | 41 +-
.../camel/component/knative/spi}/Knative.java | 22 +-
.../component/knative/spi}/KnativeEnvironment.java | 185 ++--
.../component/knative/spi}/KnativeSupport.java | 9 +-
.../component/knative/spi/KnativeTransport.java | 47 +
.../camel-knative-http}/pom.xml | 35 +-
.../camel/component/knative/http/KnativeHttp.java | 0
.../knative/http/KnativeHttpConsumer.java | 70 +-
.../http/KnativeHttpConsumerDispatcher.java | 0
.../http/KnativeHttpHeaderFilterStrategy.java | 0
.../knative/http/KnativeHttpProducer.java | 47 +-
.../component/knative/http/KnativeHttpSupport.java | 44 +-
.../knative/http/KnativeHttpTransport.java | 162 ++--
.../org/apache/camel/knative/transport/http | 18 +
.../component/knative/http/KnativeHttpTest.java | 1009 ++++++++++++++++++++
.../src/test/resources/log4j2-test.xml | 0
camel-knative/{ => camel-knative}/pom.xml | 17 +-
.../camel/component/knative/KnativeComponent.java | 93 +-
.../component/knative/KnativeConfiguration.java | 65 +-
.../knative/KnativeConversionProcessor.java | 1 +
.../camel/component/knative/KnativeEndpoint.java | 199 ++++
.../camel/component/knative/KnativeProducer.java | 1 -
.../component/knative/ce/CloudEventProcessor.java | 20 +-
.../component/knative/ce/CloudEventProcessors.java | 61 ++
.../knative/ce/CloudEventV01Processor.java} | 47 +-
.../knative/ce/CloudEventV02Processor.java} | 49 +-
.../component/knative/KnativeComponentTest.java | 87 ++
.../src/test/resources/environment.json | 14 +-
.../src/test/resources/log4j2-test.xml | 2 +-
camel-knative/pom.xml | 188 +---
.../camel/component/knative/KnativeEndpoint.java | 203 ----
.../knative/ce/CloudEventsProcessors.java | 65 --
.../component/knative/CloudEventsV01Test.java | 433 ---------
.../component/knative/CloudEventsV02Test.java | 433 ---------
.../component/knative/KnativeComponentTest.java | 648 -------------
pom.xml | 6 +-
.../maven/processors/CatalogProcessor3x.java | 3 +
49 files changed, 2110 insertions(+), 2900 deletions(-)
diff --git a/camel-k-loader-knative/pom.xml b/camel-k-loader-knative/pom.xml
index b65e1b0..0ce3da3 100644
--- a/camel-k-loader-knative/pom.xml
+++ b/camel-k-loader-knative/pom.xml
@@ -51,6 +51,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel.k</groupId>
+ <artifactId>camel-knative-http</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.k</groupId>
<artifactId>camel-k-loader-yaml</artifactId>
<scope>test</scope>
</dependency>
diff --git a/camel-k-loader-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java b/camel-k-loader-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
index 16d55eb..bde067b 100644
--- a/camel-k-loader-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
+++ b/camel-k-loader-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java
@@ -23,7 +23,8 @@ import java.util.stream.Stream;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.knative.KnativeComponent;
-import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.k.RoutesLoader;
@@ -72,7 +73,7 @@ public class KnativeSourceRoutesLoaderTest {
KnativeComponent component = new KnativeComponent();
component.setEnvironment(KnativeEnvironment.on(
- KnativeEnvironment.httpEndpoint("sink", "localhost", port)
+ KnativeEnvironment.endpoint(Knative.EndpointKind.sink, "sink", "localhost", port)
));
CamelContext context = new DefaultCamelContext();
@@ -131,7 +132,7 @@ public class KnativeSourceRoutesLoaderTest {
KnativeComponent component = new KnativeComponent();
component.setEnvironment(KnativeEnvironment.on(
- KnativeEnvironment.httpEndpoint("sink", "localhost", port)
+ KnativeEnvironment.endpoint(Knative.EndpointKind.source, "sink", "localhost", port)
));
CamelContext context = new DefaultCamelContext();
diff --git a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java b/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
index 3c08aba..fe7f5e0 100644
--- a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
+++ b/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
@@ -19,7 +19,7 @@ package org.apache.camel.k.quarkus.knative.deployment;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
-import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
public class DeploymentProcessor {
@BuildStep
diff --git a/camel-k-runtime-bom/pom.xml b/camel-k-runtime-bom/pom.xml
index 6136bfc..bb94d13 100644
--- a/camel-k-runtime-bom/pom.xml
+++ b/camel-k-runtime-bom/pom.xml
@@ -104,6 +104,11 @@
<!-- camel-k -->
<dependency>
<groupId>org.apache.camel.k</groupId>
+ <artifactId>camel-knative-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.k</groupId>
<artifactId>camel-knative</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/yaml/parser/KnativeStepParser.java b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/yaml/parser/KnativeStepParser.java
index f778ccf..581ae1d 100644
--- a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/yaml/parser/KnativeStepParser.java
+++ b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/yaml/parser/KnativeStepParser.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.camel.component.knative.Knative;
+import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.k.loader.yaml.parser.ProcessorStepParser;
import org.apache.camel.k.loader.yaml.parser.StartStepParser;
import org.apache.camel.k.loader.yaml.parser.StepParserSupport;
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
deleted file mode 100644
index e9d8c2b..0000000
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import java.util.Map;
-
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
-import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.jsse.SSLContextParameters;
-
-@UriEndpoint(
- firstVersion = "3.0.0",
- scheme = "knative-http",
- title = "KnativeHttp",
- syntax = "knative-http:host:port/path",
- label = "http",
- lenientProperties = true)
-public class KnativeHttpEndpoint extends DefaultEndpoint {
- @UriPath
- @Metadata(required = true)
- private String host;
- @UriPath
- @Metadata(required = true)
- private int port;
- @UriPath
- private String path;
-
- @UriParam(label = "advanced")
- private HeaderFilterStrategy headerFilterStrategy;
- @UriParam(label = "security")
- private SSLContextParameters sslContextParameters;
-
- @UriParam(label = "consumer")
- private Map<String, Object> headerFilter;
- @UriParam(label = "producer", defaultValue = "true")
- private Boolean throwExceptionOnFailure = Boolean.TRUE;
-
- public KnativeHttpEndpoint(String uri, KnativeHttpComponent component) {
- super(uri, component);
-
- this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public String getPath() {
- return path;
- }
-
- public void setPath(String path) {
- this.path = path;
-
- if (!this.path.startsWith("/")) {
- this.path = "/" + path;
- }
- }
-
- public HeaderFilterStrategy getHeaderFilterStrategy() {
- return headerFilterStrategy;
- }
-
- public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) {
- this.headerFilterStrategy = headerFilterStrategy;
- }
-
- public Map<String, Object> getHeaderFilter() {
- return headerFilter;
- }
-
- public void setHeaderFilter(Map<String, Object> headerFilter) {
- this.headerFilter = headerFilter;
- }
-
- public SSLContextParameters getSslContextParameters() {
- return sslContextParameters;
- }
-
- public void setSslContextParameters(SSLContextParameters sslContextParameters) {
- this.sslContextParameters = sslContextParameters;
- }
-
- public Boolean getThrowExceptionOnFailure() {
- return throwExceptionOnFailure;
- }
-
- public void setThrowExceptionOnFailure(Boolean throwExceptionOnFailure) {
- this.throwExceptionOnFailure = throwExceptionOnFailure;
- }
-
- public KnativeHttp.ServerKey getServerKey() {
- return new KnativeHttp.ServerKey(host, port);
- }
-
- @Override
- public KnativeHttpComponent getComponent() {
- return (KnativeHttpComponent)super.getComponent();
- }
-
- @Override
- public Producer createProducer() throws Exception {
- return new KnativeHttpProducer(this, getComponent().getVertx(), getComponent().getVertxHttpClientOptions());
- }
-
- @Override
- public Consumer createConsumer(Processor processor) throws Exception {
- return new KnativeHttpConsumer(this, AsyncProcessorConverterHelper.convert(processor));
- }
-}
diff --git a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
deleted file mode 100644
index c820e91..0000000
--- a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelException;
-import org.apache.camel.Exchange;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.http.common.HttpOperationFailedException;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.AvailablePortFinder;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class KnativeHttpTest {
-
- private CamelContext context;
- private ProducerTemplate template;
- private int port;
-
- // **************************
- //
- // Setup
- //
- // **************************
-
- @BeforeEach
- public void before() {
- this.context = new DefaultCamelContext();
- this.template = this.context.createProducerTemplate();
- this.port = AvailablePortFinder.getNextAvailable();
- }
-
- @AfterEach
- public void after() {
- if (this.context != null) {
- this.context.stop();
- }
- }
-
- // **************************
- //
- // Tests
- //
- // **************************
-
- @Test
- void testWithPaths() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.fromF("knative-http:0.0.0.0:%d/a/1", port)
- .routeId("r1")
- .setBody().simple("${routeId}")
- .to("mock:r1");
- b.fromF("knative-http:0.0.0.0:%d/a/2", port)
- .routeId("r2")
- .setBody().simple("${routeId}")
- .to("mock:r2");
-
- b.from("direct:start")
- .toD("undertow:http://localhost:" + port + "/a/${body}");
- });
-
- context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1);
- context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1);
- context.start();
-
- assertThat(template.requestBody("direct:start", "1", String.class)).isEqualTo("r1");
- assertThat(template.requestBody("direct:start", "2", String.class)).isEqualTo("r2");
-
- MockEndpoint.assertIsSatisfied(context);
- }
-
- @Test
- void testWithFilters() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port)
- .routeId("r1")
- .setBody().simple("${routeId}")
- .to("mock:r1");
- b.fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port)
- .routeId("r2")
- .setBody().simple("${routeId}")
- .to("mock:r2");
- b.fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port)
- .routeId("r3")
- .setBody().simple("${routeId}")
- .to("mock:r3");
-
- b.from("direct:start")
- .setHeader("MyHeader").body()
- .toF("undertow:http://localhost:%d", port);
- });
-
- context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1);
- context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1);
- context.start();
-
- assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
- assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
- assertThat(template.requestBody("direct:start", "t1", String.class)).isEqualTo("r3");
- assertThat(template.requestBody("direct:start", "t2", String.class)).isEqualTo("r3");
-
- MockEndpoint.assertIsSatisfied(context);
- }
-
- @Test
- void testWithRexFilters() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port)
- .routeId("r1")
- .setBody().simple("${routeId}");
-
- b.from("direct:start")
- .setHeader("MyHeader").body()
- .toF("undertow:http://localhost:%d", port);
- });
-
- context.start();
-
- assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
- assertThat(template.request("direct:start", e -> e.getMessage().setBody("t1"))).satisfies(e -> {
- assertThat(e.isFailed()).isTrue();
- assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
- });
- }
-
- @Test
- void testRemoveConsumer() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port)
- .routeId("r1")
- .setBody().simple("${routeId}");
- b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port)
- .routeId("r2")
- .setBody().simple("${routeId}");
- });
- RouteBuilder.addRoutes(context, b -> {
- b.from("direct:start")
- .setHeader("h").body()
- .toF("undertow:http://localhost:%d", port);
- });
-
- context.start();
-
- assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
- assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
-
- context.getRouteController().stopRoute("r2");
-
- assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
- assertThat(e.isFailed()).isTrue();
- assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
- });
- }
-
- @Test
- void testAddConsumer() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port)
- .routeId("r1")
- .setBody().simple("${routeId}");
- });
- RouteBuilder.addRoutes(context, b -> {
- b.from("direct:start")
- .setHeader("h").body()
- .toF("undertow:http://localhost:%d", port);
- });
-
- context.start();
-
- assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
- assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
- assertThat(e.isFailed()).isTrue();
- assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
- });
-
- RouteBuilder.addRoutes(context, b -> {
- b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port)
- .routeId("r2")
- .setBody().simple("${routeId}");
- });
-
- assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
- assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
- }
-
- @Test
- void testInvokeEndpoint() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.fromF("undertow:http://0.0.0.0:%d", port)
- .routeId("endpoint")
- .setBody().simple("${routeId}")
- .to("mock:endpoint");
-
- b.from("direct:start")
- .toF("knative-http:0.0.0.0:%d", port)
- .to("mock:start");
- });
-
- MockEndpoint mock1 = context.getEndpoint("mock:endpoint", MockEndpoint.class);
- mock1.expectedHeaderReceived("Host", "0.0.0.0");
- mock1.expectedMessageCount(1);
-
- MockEndpoint mock2 = context.getEndpoint("mock:start", MockEndpoint.class);
- mock2.expectedBodiesReceived("endpoint");
- mock2.expectedMessageCount(1);
-
- context.start();
-
- template.sendBody("direct:start", "1");
-
- mock1.assertIsSatisfied();
- mock2.assertIsSatisfied();
- }
-
- @Test
- void testInvokeNotExistingEndpoint() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.from("direct:start")
- .toF("knative-http:0.0.0.0:%d", port)
- .to("mock:start");
- });
-
- context.start();
-
- Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(""));
- assertThat(exchange.isFailed()).isTrue();
- assertThat(exchange.getException()).isInstanceOf(CamelException.class);
- assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking");
- }
-
- @Test
- void testInvokeEndpointWithError() throws Exception {
- RouteBuilder.addRoutes(context, b -> {
- b.from("direct:start")
- .toF("knative-http:0.0.0.0:%d", port)
- .to("mock:start");
- b.fromF("undertow:http://0.0.0.0:%d", port)
- .routeId("endpoint")
- .process(e -> {
- throw new RuntimeException("endpoint error");
- });
- });
-
- context.start();
-
- Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(""));
- assertThat(exchange.isFailed()).isTrue();
- assertThat(exchange.getException()).isInstanceOf(CamelException.class);
- assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking");
- assertThat(exchange.getException()).hasMessageContaining("with statusCode: 500, statusMessage: Internal Server Error");
- }
-}
-
diff --git a/camel-knative-http/pom.xml b/camel-knative/camel-knative-api/pom.xml
similarity index 51%
copy from camel-knative-http/pom.xml
copy to camel-knative/camel-knative-api/pom.xml
index 285e021..c562d24 100644
--- a/camel-knative-http/pom.xml
+++ b/camel-knative/camel-knative-api/pom.xml
@@ -20,12 +20,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.camel.k</groupId>
- <artifactId>camel-k-runtime-parent</artifactId>
+ <artifactId>camel-knative-parent</artifactId>
<version>1.0.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>camel-knative-http</artifactId>
+ <artifactId>camel-knative-api</artifactId>
<dependencies>
@@ -46,114 +46,30 @@
<artifactId>camel-core-engine</artifactId>
<scope>provided</scope>
</dependency>
-
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-web</artifactId>
- <version>${vertx.version}</version>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-web-client</artifactId>
- <version>${vertx.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>spi-annotations</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>apt</artifactId>
+ <artifactId>camel-cloud</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- <version>${commons-collections4.version}</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</dependency>
-
- <!-- ****************************** -->
- <!-- -->
- <!-- TESTS -->
- <!-- -->
- <!-- ****************************** -->
-
<dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-test</artifactId>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-properties</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-mock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-direct</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-log</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-undertow</artifactId>
- <scope>test</scope>
+ <artifactId>spi-annotations</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-bean</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- <version>${assertj.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>${log4j2.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>${log4j2.version}</version>
- <scope>test</scope>
+ <artifactId>apt</artifactId>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
similarity index 54%
copy from camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
copy to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
index 3c08aba..f9c5ad9 100644
--- a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.k.quarkus.knative.deployment;
+package org.apache.camel.component.knative.spi;
-import io.quarkus.deployment.annotations.BuildProducer;
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
-import org.apache.camel.component.knative.KnativeEnvironment;
+public interface CloudEvent {
+ String version();
+ Attributes attributes();
-public class DeploymentProcessor {
- @BuildStep
- void registerReflectiveClasses(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, KnativeEnvironment.class));
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, KnativeEnvironment.KnativeServiceDefinition.class));
+ interface Attributes {
+ String id();
+ String source();
+ String spec();
+ String type();
+ String time();
}
}
diff --git a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventTypeConverter.java
similarity index 54%
copy from camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
copy to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventTypeConverter.java
index 3c08aba..f0234b3 100644
--- a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventTypeConverter.java
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.k.quarkus.knative.deployment;
+package org.apache.camel.component.knative.spi;
-import io.quarkus.deployment.annotations.BuildProducer;
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
-import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.Converter;
-public class DeploymentProcessor {
- @BuildStep
- void registerReflectiveClasses(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, KnativeEnvironment.class));
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, KnativeEnvironment.KnativeServiceDefinition.class));
+@Converter(loader = true)
+public final class CloudEventTypeConverter {
+ private CloudEventTypeConverter() {
+ }
+
+ @Converter
+ public static CloudEvent fromSpecVersion(String version) {
+ return CloudEvents.fromSpecVersion(version);
}
}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java
similarity index 50%
copy from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
copy to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java
index 65a04b8..845104f 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java
@@ -14,31 +14,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.knative;
+package org.apache.camel.component.knative.spi;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
+final class CloudEventV01 implements CloudEvent {
+ public static final String VERSION = "0.1";
+ public static final Attributes ATTRIBUTES = new Attributes() {
+ @Override
+ public String id() {
+ return "CE-EventID";
+ }
-/**
- * Converts objects prior to serializing them to external endpoints or channels
- */
-public class KnativeConversionProcessor implements Processor {
+ @Override
+ public String source() {
+ return "CE-Source";
+ }
+
+ @Override
+ public String spec() {
+ return "CE-CloudEventsVersion";
+ }
- private boolean enabled;
+ @Override
+ public String type() {
+ return "CE-EventType";
+ }
+
+ @Override
+ public String time() {
+ return "CE-EventTime";
+ }
+ };
- public KnativeConversionProcessor(boolean enabled) {
- this.enabled = enabled;
+ @Override
+ public String version() {
+ return VERSION;
}
@Override
- public void process(Exchange exchange) throws Exception {
- if (enabled) {
- Object body = exchange.getIn().getBody();
- if (body != null) {
- byte[] newBody = Knative.MAPPER.writeValueAsBytes(body);
- exchange.getIn().setBody(newBody);
- exchange.getIn().setHeader(Exchange.CONTENT_TYPE, "application/json");
- }
- }
+ public Attributes attributes() {
+ return ATTRIBUTES;
}
}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java
similarity index 51%
copy from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
copy to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java
index 65a04b8..7b28dcc 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java
@@ -14,31 +14,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.knative;
+package org.apache.camel.component.knative.spi;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
+final class CloudEventV02 implements CloudEvent {
+ public static final String VERSION = "0.2";
+ public static final Attributes ATTRIBUTES = new Attributes() {
+ @Override
+ public String id() {
+ return "ce-id";
+ }
-/**
- * Converts objects prior to serializing them to external endpoints or channels
- */
-public class KnativeConversionProcessor implements Processor {
+ @Override
+ public String source() {
+ return "ce-source";
+ }
+
+ @Override
+ public String spec() {
+ return "ce-specversion";
+ }
- private boolean enabled;
+ @Override
+ public String type() {
+ return "ce-type";
+ }
+
+ @Override
+ public String time() {
+ return "ce-time";
+ }
+ };
- public KnativeConversionProcessor(boolean enabled) {
- this.enabled = enabled;
+ @Override
+ public String version() {
+ return VERSION;
}
@Override
- public void process(Exchange exchange) throws Exception {
- if (enabled) {
- Object body = exchange.getIn().getBody();
- if (body != null) {
- byte[] newBody = Knative.MAPPER.writeValueAsBytes(body);
- exchange.getIn().setBody(newBody);
- exchange.getIn().setHeader(Exchange.CONTENT_TYPE, "application/json");
- }
- }
+ public Attributes attributes() {
+ return ATTRIBUTES;
}
}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
similarity index 52%
copy from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
copy to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
index 65a04b8..ff5542f 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
@@ -14,31 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.knative;
+package org.apache.camel.component.knative.spi;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
+import java.util.Objects;
-/**
- * Converts objects prior to serializing them to external endpoints or channels
- */
-public class KnativeConversionProcessor implements Processor {
+public enum CloudEvents implements CloudEvent {
+ V01(new CloudEventV01()),
+ V02(new CloudEventV02());
+
+ private final CloudEvent instance;
- private boolean enabled;
+ CloudEvents(CloudEvent instance) {
+ this.instance = instance;
+ }
- public KnativeConversionProcessor(boolean enabled) {
- this.enabled = enabled;
+ @Override
+ public String version() {
+ return instance.version();
}
@Override
- public void process(Exchange exchange) throws Exception {
- if (enabled) {
- Object body = exchange.getIn().getBody();
- if (body != null) {
- byte[] newBody = Knative.MAPPER.writeValueAsBytes(body);
- exchange.getIn().setBody(newBody);
- exchange.getIn().setHeader(Exchange.CONTENT_TYPE, "application/json");
+ public Attributes attributes() {
+ return instance.attributes();
+ }
+
+ public static CloudEvent fromSpecVersion(String version) {
+ for (CloudEvent event: CloudEvents.values()) {
+ if (Objects.equals(event.version(), version)) {
+ return event;
}
}
+
+ throw new IllegalArgumentException("Unable to find an implementation fo CloudEvents spec: " + version);
}
}
+
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
similarity index 75%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
rename to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
index 605ac14..75aa80e 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/Knative.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.knative;
+package org.apache.camel.component.knative.spi;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
@@ -22,16 +22,15 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
public final class Knative {
public static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new Jdk8Module());
- public static final int DEFAULT_HTTP_PORT = 80;
-
- public static final String HTTP_COMPONENT = "knative-http";
- public static final String KNATIVE_PROTOCOL = "knative.protocol";
+ public static final String KNATIVE_TRANSPORT_RESOURCE_PATH = "META-INF/services/org/apache/camel/knative/transport/";
+ public static final String KNATIVE_FILTER_PREFIX = "filter.";
public static final String KNATIVE_TYPE = "knative.type";
public static final String KNATIVE_EVENT_TYPE = "knative.event.type";
- public static final String FILTER_HEADER_NAME = "filter.header.name";
- public static final String FILTER_HEADER_VALUE = "filter.header.value";
+ public static final String KNATIVE_KIND = "knative.kind";
+ public static final String KNATIVE_API_VERSION = "knative.apiVersion";
public static final String CONTENT_TYPE = "content.type";
public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json";
+ public static final String CAMEL_ENDPOINT_KIND = "camel.endpoint.kind";
public static final String SERVICE_META_HOST = "service.host";
public static final String SERVICE_META_ZONE = "service.zone";
@@ -40,13 +39,18 @@ public final class Knative {
private Knative() {
}
+ public enum EndpointKind {
+ source,
+ sink,
+ }
+
public enum Type {
endpoint,
- channel
+ channel,
+ event
}
public enum Protocol {
http,
- https
}
}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
similarity index 56%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEnvironment.java
rename to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
index 814740a..f74fb5b 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEnvironment.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.knative;
+package org.apache.camel.component.knative.spi;
import java.io.InputStream;
import java.io.Reader;
@@ -22,11 +22,9 @@ import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.stream.Stream;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,7 +32,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.cloud.DefaultServiceDefinition;
import org.apache.camel.support.ResourceHelper;
-import org.apache.camel.util.StringHelper;
/*
* Assuming it is loaded from a json for now
@@ -53,24 +50,12 @@ public class KnativeEnvironment {
return services.stream();
}
- public Optional<KnativeServiceDefinition> lookupService(Knative.Type type, String name, String... aliases) {
- return Stream.concat(Stream.of(name), Stream.of(aliases))
- .sequential()
- .map(n -> lookup(type, n))
- .filter(Optional::isPresent)
- .map(Optional::get)
- .findFirst();
- }
-
- public KnativeServiceDefinition mandatoryLookupService(Knative.Type type, String name) {
- return lookupService(type, name).orElseThrow(
- () -> new IllegalArgumentException("Unable to find the service \"" + name + "\" with type \"" + type + "\"")
- );
- }
-
-
- public KnativeServiceDefinition lookupServiceOrDefault(Knative.Type type, String name) {
- return lookupService(type, name, "default").orElseGet(() -> computeServiceDefinition(type, name));
+ public Stream<KnativeServiceDefinition> lookup(Knative.Type type, String name) {
+ return services.stream()
+ .filter(definition -> {
+ return Objects.equals(type.name(), definition.getMetadata().get(Knative.KNATIVE_TYPE))
+ && Objects.equals(name, definition.getName());
+ });
}
// ************************
@@ -79,62 +64,6 @@ public class KnativeEnvironment {
//
// ************************
- private Optional<KnativeServiceDefinition> lookup(Knative.Type type, String name) {
- final String contextPath = StringHelper.after(name, "/");
- final String serviceName = (contextPath == null) ? name : StringHelper.before(name, "/");
-
- return services.stream()
- .filter(definition -> {
- return Objects.equals(type.name(), definition.getMetadata().get(Knative.KNATIVE_TYPE))
- && Objects.equals(serviceName, definition.getName());
- })
- .map(definition -> {
- //
- // The context path set on the endpoint overrides the one
- // eventually provided by the service definition.
- //
- if (contextPath != null) {
- return new KnativeServiceDefinition(
- definition.getType(),
- definition.getProtocol(),
- definition.getName(),
- definition.getHost(),
- definition.getPort(),
- KnativeSupport.mergeMaps(
- definition.getMetadata(),
- Collections.singletonMap(Knative.SERVICE_META_PATH, "/" + contextPath)
- )
- );
- }
-
- return definition;
- })
- .findFirst();
-
- }
-
- public static KnativeServiceDefinition computeServiceDefinition(Knative.Type type, String name) {
- String contextPath = StringHelper.after(name, "/");
- String serviceName = (contextPath == null) ? name : StringHelper.before(name, "/");
- Map<String, String> meta = new HashMap<>();
-
- if (type == Knative.Type.channel && !serviceName.endsWith("-channel")) {
- serviceName += "-channel";
-
- }
- if (contextPath != null) {
- meta.put(Knative.SERVICE_META_PATH, "/" + contextPath);
- }
-
- return new KnativeEnvironment.KnativeServiceDefinition(
- type,
- Knative.Protocol.http,
- serviceName,
- "",
- -1,
- meta
- );
- }
public static KnativeEnvironment mandatoryLoadFromSerializedString(CamelContext context, String configuration) throws Exception {
try (Reader reader = new StringReader(configuration)) {
@@ -152,15 +81,16 @@ public class KnativeEnvironment {
// "services": [
// {
// "type": "channel|endpoint",
- // "protocol": "http",
// "name": "",
// "host": "",
// "port": "",
// "metadata": {
// "service.path": "",
+ // "filter.header": "value",
// "knative.event.type": "",
- // "filter.header.name": "",
- // "filter.header.value": ""
+ // "knative.kind": "",
+ // "knative.apiVersion": "",
+ // "camel.endpoint.kind": "source|sink",
// }
// },
// ]
@@ -171,24 +101,85 @@ public class KnativeEnvironment {
}
}
- public static KnativeServiceDefinition httpEndpoint(String name, String host, int port) {
- return new KnativeEnvironment.KnativeServiceDefinition(
+ public static KnativeServiceDefinition endpoint(Knative.EndpointKind endpointKind, String name, String host, int port) {
+ return entry(
+ endpointKind,
Knative.Type.endpoint,
- Knative.Protocol.http,
name,
host,
port,
- Collections.emptyMap());
+ Collections.emptyMap()
+ );
}
- public static KnativeServiceDefinition httpChannel(String name, String host, int port) {
- return new KnativeEnvironment.KnativeServiceDefinition(
+ public static KnativeServiceDefinition endpoint(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) {
+ return entry(
+ endpointKind,
+ Knative.Type.endpoint,
+ name,
+ host,
+ port,
+ metadata
+ );
+ }
+
+ public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind, String name, String host, int port) {
+ return entry(
+ endpointKind,
Knative.Type.channel,
- Knative.Protocol.http,
name,
host,
port,
- Collections.emptyMap());
+ Collections.emptyMap()
+ );
+ }
+
+ public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) {
+ return entry(
+ endpointKind,
+ Knative.Type.channel,
+ name,
+ host,
+ port,
+ metadata
+ );
+ }
+
+ public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port) {
+ return entry(
+ endpointKind,
+ Knative.Type.event,
+ name,
+ host,
+ port,
+ Collections.emptyMap()
+ );
+ }
+
+ public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) {
+ return entry(
+ endpointKind,
+ Knative.Type.event,
+ name,
+ host,
+ port,
+ metadata
+ );
+ }
+
+ public static KnativeServiceDefinition entry(Knative.EndpointKind endpointKind, Knative.Type type, String name, String host, int port, Map<String, String> metadata) {
+ return new KnativeEnvironment.KnativeServiceDefinition(
+ type,
+ name,
+ host,
+ port,
+ KnativeSupport.mergeMaps(
+ metadata,
+ KnativeSupport.mapOf(
+ Knative.CAMEL_ENDPOINT_KIND, endpointKind.name()
+ )
+ )
+ );
}
public static KnativeEnvironment on(KnativeServiceDefinition... definitions) {
@@ -205,7 +196,6 @@ public class KnativeEnvironment {
@JsonCreator
public KnativeServiceDefinition(
@JsonProperty(value = "type", required = true) Knative.Type type,
- @JsonProperty(value = "protocol", required = true) Knative.Protocol protocol,
@JsonProperty(value = "name", required = true) String name,
@JsonProperty(value = "host", required = true) String host,
@JsonProperty(value = "port", required = true) int port,
@@ -218,8 +208,7 @@ public class KnativeEnvironment {
KnativeSupport.mergeMaps(
metadata,
KnativeSupport.mapOf(
- Knative.KNATIVE_TYPE, type.name(),
- Knative.KNATIVE_PROTOCOL, protocol.name())
+ Knative.KNATIVE_TYPE, type.name())
)
);
}
@@ -228,16 +217,20 @@ public class KnativeEnvironment {
return Knative.Type.valueOf(getMetadata().get(Knative.KNATIVE_TYPE));
}
- public Knative.Protocol getProtocol() {
- return Knative.Protocol.valueOf(getMetadata().get(Knative.KNATIVE_PROTOCOL));
- }
-
public String getPath() {
return getMetadata().get(Knative.SERVICE_META_PATH);
}
+ public String getPathOrDefault(String path) {
+ return getMetadata().getOrDefault(Knative.SERVICE_META_PATH, path);
+ }
+
public String getEventType() {
return getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
}
+
+ public int getPortOrDefault(int port) {
+ return getPort() != -1 ? getPort() : port;
+ }
}
}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeSupport.java
similarity index 84%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
rename to camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeSupport.java
index 8583c3d..0d28dae 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeSupport.java
@@ -14,22 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.knative;
+package org.apache.camel.component.knative.spi;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
-
-import org.apache.camel.Exchange;
public final class KnativeSupport {
private KnativeSupport() {
}
- public static boolean hasStructuredContent(Exchange exchange) {
- return Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE);
- }
-
@SafeVarargs
public static <K, V> Map<K, V> mergeMaps(Map<K, V> map, Map<K, V>... maps) {
Map<K, V> answer = new HashMap<>();
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
new file mode 100644
index 0000000..e9936a8
--- /dev/null
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.spi;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.Service;
+
+public interface KnativeTransport extends Service {
+ /**
+ * Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
+ *
+ * @param endpoint the endpoint for which the producer should be created
+ * @param service the service definition containing information about how make reach the target service.
+ * @return
+ */
+ Producer createProducer(
+ Endpoint endpoint,
+ KnativeEnvironment.KnativeServiceDefinition service);
+
+ /**
+ * Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
+ *
+ * @param endpoint the endpoint for which the consumer should be created.
+ * @param service the service definition containing information about how make the route reachable from knative.
+ * @return
+ */
+ Consumer createConsumer(
+ Endpoint endpoint,
+ KnativeEnvironment.KnativeServiceDefinition service, Processor processor);
+}
diff --git a/camel-knative-http/pom.xml b/camel-knative/camel-knative-http/pom.xml
similarity index 91%
rename from camel-knative-http/pom.xml
rename to camel-knative/camel-knative-http/pom.xml
index 285e021..ae36949 100644
--- a/camel-knative-http/pom.xml
+++ b/camel-knative/camel-knative-http/pom.xml
@@ -20,7 +20,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.camel.k</groupId>
- <artifactId>camel-k-runtime-parent</artifactId>
+ <artifactId>camel-knative-parent</artifactId>
<version>1.0.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -46,6 +46,16 @@
<artifactId>camel-core-engine</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-cloud</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel.k</groupId>
+ <artifactId>camel-knative-api</artifactId>
+ </dependency>
<dependency>
<groupId>io.vertx</groupId>
@@ -59,17 +69,6 @@
</dependency>
<dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>spi-annotations</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>apt</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
@@ -82,6 +81,12 @@
<!-- ****************************** -->
<dependency>
+ <groupId>org.apache.camel.k</groupId>
+ <artifactId>camel-knative</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<exclusions>
@@ -137,6 +142,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
similarity index 100%
rename from camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
rename to camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
similarity index 79%
rename from camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
rename to camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index a9bbedf..cb99b0d 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -21,7 +21,6 @@ import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;
-import java.util.Objects;
import java.util.function.Predicate;
import io.vertx.core.buffer.Buffer;
@@ -29,69 +28,52 @@ import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
-import org.apache.camel.util.ObjectHelper;
public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.PredicatedHandler {
+ private final KnativeHttpTransport transport;
private final Predicate<HttpServerRequest> filter;
+ private final KnativeHttp.ServerKey key;
+ private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition;
+ private final HeaderFilterStrategy headerFilterStrategy;
- public KnativeHttpConsumer(KnativeHttpEndpoint endpoint, Processor processor) {
- super(endpoint, processor);
-
- filter = v -> {
- if (!Objects.equals(endpoint.getPath(), v.path())) {
- return false;
- }
- if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) {
- return true;
- }
-
- for (Map.Entry<String, Object> entry : endpoint.getHeaderFilter().entrySet()) {
- String ref = entry.getValue().toString();
- String val = v.getHeader(entry.getKey());
- boolean matches = Objects.equals(ref, val) || val.matches(ref);
-
- if (!matches) {
- return false;
- }
- }
+ public KnativeHttpConsumer(
+ KnativeHttpTransport transport,
+ Endpoint endpoint,
+ KnativeEnvironment.KnativeServiceDefinition serviceDefinition,
+ Processor processor) {
- return true;
- };
- }
+ super(endpoint, processor);
- @Override
- public KnativeHttpEndpoint getEndpoint() {
- return (KnativeHttpEndpoint) super.getEndpoint();
+ this.transport = transport;
+ this.serviceDefinition = serviceDefinition;
+ this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
+ this.key = new KnativeHttp.ServerKey(serviceDefinition.getHost(), serviceDefinition.getPortOrDefault(KnativeHttp.DEFAULT_PORT));
+ this.filter = KnativeHttpSupport.createFilter(serviceDefinition);
}
@Override
protected void doStart() throws Exception {
- final KnativeHttpEndpoint endpoint = getEndpoint();
- final KnativeHttpComponent component = endpoint.getComponent();
- final KnativeHttp.ServerKey key = endpoint.getServerKey();
-
- component.getDispatcher(key).bind(this);
+ this.transport.getDispatcher(key).bind(this);
super.doStart();
}
@Override
protected void doStop() throws Exception {
- final KnativeHttpEndpoint endpoint = getEndpoint();
- final KnativeHttpComponent component = endpoint.getComponent();
- final KnativeHttp.ServerKey key = endpoint.getServerKey();
-
- component.getDispatcher(key).unbind(this);
+ this.transport.getDispatcher(key).unbind(this);
super.doStop();
}
@@ -153,12 +135,11 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.
}
private Message toMessage(HttpServerRequest request, Exchange exchange) {
- KnativeHttpEndpoint endpoint = getEndpoint();
Message message = new DefaultMessage(exchange.getContext());
String path = request.path();
- if (endpoint.getPath() != null) {
- String endpointPath = endpoint.getPath();
+ if (serviceDefinition.getPath() != null) {
+ String endpointPath = serviceDefinition.getPath();
String matchPath = path.toLowerCase(Locale.US);
String match = endpointPath.toLowerCase(Locale.US);
@@ -168,12 +149,12 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.
}
for (Map.Entry<String, String> entry : request.headers().entries()) {
- if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
}
}
for (Map.Entry<String, String> entry : request.params().entries()) {
- if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
}
}
@@ -204,7 +185,7 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.
if (headerValue == null) {
continue;
}
- if (!getEndpoint().getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
+ if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
response.putHeader(key, headerValue);
}
}
@@ -237,4 +218,5 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.
message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body)
);
}
+
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java
similarity index 100%
rename from camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java
rename to camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
similarity index 100%
rename from camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
rename to camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
similarity index 76%
rename from camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
rename to camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
index 40792b0..fd4328e 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
@@ -27,9 +27,12 @@ import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Message;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.MessageHelper;
@@ -41,20 +44,27 @@ import org.slf4j.LoggerFactory;
public class KnativeHttpProducer extends DefaultAsyncProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class);
+ private final KnativeHttpTransport transport;
+ private final KnativeEnvironment.KnativeServiceDefinition serviceDefinition;
private final Vertx vertx;
private final WebClientOptions clientOptions;
+ private final HeaderFilterStrategy headerFilterStrategy;
+
private WebClient client;
- public KnativeHttpProducer(KnativeHttpEndpoint endpoint, Vertx vertx, WebClientOptions clientOptions) {
+ public KnativeHttpProducer(
+ KnativeHttpTransport transport,
+ Endpoint endpoint,
+ KnativeEnvironment.KnativeServiceDefinition serviceDefinition,
+ Vertx vertx,
+ WebClientOptions clientOptions) {
super(endpoint);
+ this.transport = transport;
+ this.serviceDefinition = serviceDefinition;
this.vertx = ObjectHelper.notNull(vertx, "vertx");
this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new);
- }
-
- @Override
- public KnativeHttpEndpoint getEndpoint() {
- return (KnativeHttpEndpoint)super.getEndpoint();
+ this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
}
@Override
@@ -70,11 +80,10 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
return true;
}
- KnativeHttpEndpoint endpoint = getEndpoint();
Message message = exchange.getMessage();
MultiMap headers = MultiMap.caseInsensitiveMultiMap();
- headers.add(HttpHeaders.HOST, endpoint.getHost());
+ headers.add(HttpHeaders.HOST, serviceDefinition.getHost());
headers.add(HttpHeaders.CONTENT_LENGTH, Integer.toString(payload.length));
String contentType = MessageHelper.getContentType(message);
@@ -83,12 +92,22 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
}
for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
- if (!endpoint.getHeaderFilterStrategy().applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
headers.add(entry.getKey(), entry.getValue().toString());
}
}
- client.post(endpoint.getPort(), endpoint.getHost(), endpoint.getPath())
+ if (ObjectHelper.isEmpty(serviceDefinition.getHost())) {
+ exchange.setException(new CamelException("HTTP operation failed because host is not defined"));
+ callback.done(true);
+
+ return true;
+ }
+
+ final int port = serviceDefinition.getPortOrDefault(KnativeHttp.DEFAULT_PORT);
+ final String path = serviceDefinition.getPathOrDefault(KnativeHttp.DEFAULT_PATH);
+
+ client.post(port, serviceDefinition.getHost(), path)
.putHeaders(headers)
.sendBuffer(Buffer.buffer(payload), response -> {
if (response.succeeded()) {
@@ -98,7 +117,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
for (Map.Entry<String, String> entry : result.headers().entries()) {
- if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
KnativeHttpSupport.appendHeader(answer.getHeaders(), entry.getKey(), entry.getValue());
}
}
@@ -121,7 +140,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
exchange.setMessage(answer);
- } else if (response.failed() && endpoint.getThrowExceptionOnFailure()) {
+ } else if (response.failed()) {
String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(getURI());
if (response.result() != null) {
exceptionMessage += " with statusCode: " + response.result().statusCode();
@@ -155,7 +174,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
}
private String getURI() {
- String p = getEndpoint().getPath();
+ String p = serviceDefinition.getPath();
if (p == null) {
p = KnativeHttp.DEFAULT_PATH;
@@ -163,6 +182,6 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
p = "/" + p;
}
- return String.format("http://%s:%d%s", getEndpoint().getHost(), getEndpoint().getPort(), p);
+ return String.format("http://%s:%d%s", serviceDefinition.getHost(), serviceDefinition.getPort(), p);
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
similarity index 51%
rename from camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
rename to camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
index a858f75..daeeeff 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -19,8 +19,15 @@ package org.apache.camel.component.knative.http;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
+import io.vertx.core.http.HttpServerRequest;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.util.ObjectHelper;
+
public final class KnativeHttpSupport {
private KnativeHttpSupport() {
}
@@ -43,7 +50,40 @@ public final class KnativeHttpSupport {
headers.put(key, value);
}
- public static Map<String, String> asStringMap(Map<String, Object> map) {
- return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ public static Predicate<HttpServerRequest> createFilter(KnativeEnvironment.KnativeServiceDefinition serviceDefinition) {
+ Map<String, String> filters = serviceDefinition.getMetadata().entrySet().stream()
+ .filter(e -> e.getKey().startsWith(Knative.KNATIVE_FILTER_PREFIX))
+ .collect(Collectors.toMap(
+ e -> e.getKey().substring(Knative.KNATIVE_FILTER_PREFIX.length()),
+ e -> e.getValue()
+ ));
+
+
+ String path = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttp.DEFAULT_PATH);
+
+ return v -> {
+ if (!Objects.equals(path, v.path())) {
+ return false;
+ }
+ if (filters.isEmpty()) {
+ return true;
+ }
+
+ for (Map.Entry<String, String> entry : filters.entrySet()) {
+ String ref = entry.getValue();
+ String val = v.getHeader(entry.getKey());
+
+ if (val == null) {
+ return false;
+ }
+
+ boolean matches = Objects.equals(ref, val) || val.matches(ref);
+ if (!matches) {
+ return false;
+ }
+ }
+
+ return true;
+ };
}
}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
similarity index 74%
rename from camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
rename to camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
index ec27ecf..0f255a5 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
@@ -22,50 +22,97 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.regex.Matcher;
-
-import static java.lang.Integer.parseInt;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.client.WebClientOptions;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.annotations.Component;
-import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeTransport;
+import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.PropertiesHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Component("knative-http")
-public class KnativeHttpComponent extends DefaultComponent {
- private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class);
+public class KnativeHttpTransport extends ServiceSupport implements CamelContextAware, KnativeTransport {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpTransport.class);
private final Map<KnativeHttp.ServerKey, KnativeHttpConsumerDispatcher> registry;
- @Metadata(label = "advanced")
private Vertx vertx;
- @Metadata(label = "advanced")
private VertxOptions vertxOptions;
- @Metadata(label = "advanced")
private HttpServerOptions vertxHttpServerOptions;
- @Metadata(label = "advanced")
private WebClientOptions vertxHttpClientOptions;
+ private CamelContext camelContext;
private boolean localVertx;
private ExecutorService executor;
- public KnativeHttpComponent() {
+ public KnativeHttpTransport() {
this.registry = new ConcurrentHashMap<>();
this.localVertx = false;
}
+ public Vertx getVertx() {
+ return vertx;
+ }
+
+ public void setVertx(Vertx vertx) {
+ this.vertx = vertx;
+ }
+
+ public VertxOptions getVertxOptions() {
+ return vertxOptions;
+ }
+
+ public void setVertxOptions(VertxOptions vertxOptions) {
+ this.vertxOptions = vertxOptions;
+ }
+
+ public HttpServerOptions getVertxHttpServerOptions() {
+ return vertxHttpServerOptions;
+ }
+
+ public void setVertxHttpServerOptions(HttpServerOptions vertxHttpServerOptions) {
+ this.vertxHttpServerOptions = vertxHttpServerOptions;
+ }
+
+ public WebClientOptions getVertxHttpClientOptions() {
+ return vertxHttpClientOptions;
+ }
+
+ public void setVertxHttpClientOptions(WebClientOptions vertxHttpClientOptions) {
+ this.vertxHttpClientOptions = vertxHttpClientOptions;
+ }
+
+ KnativeHttpConsumerDispatcher getDispatcher(KnativeHttp.ServerKey key) {
+ return registry.computeIfAbsent(key, k -> new KnativeHttpConsumerDispatcher(executor, vertx, k, vertxHttpServerOptions));
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
@Override
- protected void doInit() throws Exception {
- super.doInit();
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+ // *****************************
+ //
+ // Lifecycle
+ //
+ // *****************************
+
+ @Override
+ protected void doStart() throws Exception {
this.executor = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "knative-http-component");
if (this.vertx != null) {
@@ -101,9 +148,7 @@ public class KnativeHttpComponent extends DefaultComponent {
}
@Override
- protected void doShutdown() throws Exception {
- super.doShutdown();
-
+ protected void doStop() throws Exception {
if (this.vertx != null && this.localVertx) {
Future<?> future = this.executor.submit(
() -> {
@@ -146,74 +191,19 @@ public class KnativeHttpComponent extends DefaultComponent {
}
}
- @Override
- protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- Matcher matcher = KnativeHttp.ENDPOINT_PATTERN.matcher(remaining);
- if (!matcher.find()) {
- throw new IllegalArgumentException("Bad URI: " + remaining);
- }
-
- KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this);
- ep.setHeaderFilter(PropertiesHelper.extractProperties(parameters, "filter.", true));
-
- switch (matcher.groupCount()) {
- case 1:
- ep.setHost(matcher.group(1));
- ep.setPort(KnativeHttp.DEFAULT_PORT);
- ep.setPath(KnativeHttp.DEFAULT_PATH);
- break;
- case 2:
- ep.setHost(matcher.group(1));
- ep.setPort(parseInt(matcher.group(2)));
- ep.setPath(KnativeHttp.DEFAULT_PATH);
- break;
- case 3:
- ep.setHost(matcher.group(1));
- ep.setPort(parseInt(matcher.group(2)));
- ep.setPath(KnativeHttp.DEFAULT_PATH + matcher.group(3));
- break;
- default:
- throw new IllegalArgumentException("Bad URI: " + remaining);
- }
-
- setProperties(ep, parameters);
-
- return ep;
- }
-
- public Vertx getVertx() {
- return vertx;
- }
-
- public void setVertx(Vertx vertx) {
- this.vertx = vertx;
- }
-
- public VertxOptions getVertxOptions() {
- return vertxOptions;
- }
-
- public void setVertxOptions(VertxOptions vertxOptions) {
- this.vertxOptions = vertxOptions;
- }
-
- public HttpServerOptions getVertxHttpServerOptions() {
- return vertxHttpServerOptions;
- }
-
- public void setVertxHttpServerOptions(HttpServerOptions vertxHttpServerOptions) {
- this.vertxHttpServerOptions = vertxHttpServerOptions;
- }
+ // *****************************
+ //
+ //
+ //
+ // *****************************
- public WebClientOptions getVertxHttpClientOptions() {
- return vertxHttpClientOptions;
- }
-
- public void setVertxHttpClientOptions(WebClientOptions vertxHttpClientOptions) {
- this.vertxHttpClientOptions = vertxHttpClientOptions;
+ @Override
+ public Producer createProducer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
+ return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions);
}
- KnativeHttpConsumerDispatcher getDispatcher(KnativeHttp.ServerKey key) {
- return registry.computeIfAbsent(key, k -> new KnativeHttpConsumerDispatcher(executor, vertx, k, vertxHttpServerOptions));
+ @Override
+ public Consumer createConsumer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
+ return new KnativeHttpConsumer(this, endpoint, service, processor);
}
}
diff --git a/camel-knative/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/knative/transport/http b/camel-knative/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/knative/transport/http
new file mode 100644
index 0000000..cd5c081
--- /dev/null
+++ b/camel-knative/camel-knative-http/src/main/resources/META-INF/services/org/apache/camel/knative/transport/http
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class = org.apache.camel.component.knative.http.KnativeHttpTransport
\ No newline at end of file
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
new file mode 100644
index 0000000..9898f22
--- /dev/null
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.knative.KnativeComponent;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.CloudEvents;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.http.common.HttpOperationFailedException;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KnativeHttpTest {
+
+ private CamelContext context;
+ private ProducerTemplate template;
+ private int port;
+
+ // **************************
+ //
+ // Setup
+ //
+ // **************************
+
+ @BeforeEach
+ public void before() {
+ this.context = new DefaultCamelContext();
+ this.template = this.context.createProducerTemplate();
+ this.port = AvailablePortFinder.getNextAvailable();
+ }
+
+ @AfterEach
+ public void after() {
+ if (this.context != null) {
+ this.context.stop();
+ }
+ }
+
+ // **************************
+ //
+ // Tests
+ //
+ // **************************
+
+ @Test
+ void testCreateComponent() {
+ context.start();
+
+ assertThat(context.getComponent("knative")).isInstanceOfSatisfying(KnativeComponent.class, c -> {
+ assertThat(c.getTransport()).isInstanceOf(KnativeHttpTransport.class);
+ });
+
+ }
+
+ private static Stream<Arguments> provideCloudEventsImplementations() {
+ return Stream.of(
+ Arguments.of(CloudEvents.V01),
+ Arguments.of(CloudEvents.V02)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testInvokeEndpoint(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.sink,
+ "myEndpoint",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setProtocol(Knative.Protocol.http);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:source")
+ .to("knative:endpoint/myEndpoint");
+ fromF("undertow:http://localhost:%d/a/path", port)
+ .to("mock:ce");
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock.expectedHeaderReceived(ce.attributes().source(), "knative://endpoint/myEndpoint");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id()));
+ mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
+
+ context.createProducerTemplate().sendBody("direct:source", "test");
+
+ mock.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testConsumeStructuredContent(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "myEndpoint",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/myEndpoint")
+ .to("mock:ce");
+ from("direct:source")
+ .toF("undertow:http://localhost:%d/a/path", port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock.expectedHeaderReceived(ce.attributes().id(), "myEventID");
+ mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+
+ if (Objects.equals(ce.version(), ce.version())) {
+ e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
+ "cloudEventsVersion", ce.version(),
+ "eventType", "org.apache.camel.event",
+ "eventID", "myEventID",
+ "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+ "source", "/somewhere",
+ "contentType", "text/plain",
+ "data", "test"
+ )));
+ }
+ if (Objects.equals(CloudEvents.V02.version(), ce.version())) {
+ e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
+ "specversion", ce.version(),
+ "type", "org.apache.camel.event",
+ "id", "myEventID",
+ "time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+ "source", "/somewhere",
+ "contenttype", "text/plain",
+ "data", "test"
+ )));
+ }
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testConsumeContent(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "myEndpoint",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/myEndpoint")
+ .to("mock:ce");
+
+ from("direct:source")
+ .toF("undertow:http://localhost:%d/a/path", port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock.expectedHeaderReceived(ce.attributes().id(), "myEventID");
+ mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain");
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "/somewhere");
+ e.getMessage().setBody("test");
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testConsumeContentWithFilter(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep1",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE1"
+ )),
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep2",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE2"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/ep1")
+ .convertBodyTo(String.class)
+ .to("log:ce1?showAll=true&multiline=true")
+ .to("mock:ce1");
+ from("knative:endpoint/ep2")
+ .convertBodyTo(String.class)
+ .to("log:ce2?showAll=true&multiline=true")
+ .to("mock:ce2");
+
+ from("direct:source")
+ .setBody()
+ .constant("test")
+ .setHeader(Exchange.HTTP_METHOD)
+ .constant("POST")
+ .toD("undertow:http://localhost:" + port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+ mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock1.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1");
+ mock1.expectedHeaderReceived(ce.attributes().source(), "CE1");
+ mock1.expectedBodiesReceived("test");
+ mock1.expectedMessageCount(1);
+
+ MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+ mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock2.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2");
+ mock2.expectedHeaderReceived(ce.attributes().source(), "CE2");
+ mock2.expectedBodiesReceived("test");
+ mock2.expectedMessageCount(1);
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID1");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "CE1");
+ }
+ );
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID2");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "CE2");
+ }
+ );
+
+ mock1.assertIsSatisfied();
+ mock2.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep1",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE[01234]"
+ )),
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep2",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE[56789]"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/ep1")
+ .convertBodyTo(String.class)
+ .to("log:ce1?showAll=true&multiline=true")
+ .to("mock:ce1");
+ from("knative:endpoint/ep2")
+ .convertBodyTo(String.class)
+ .to("log:ce2?showAll=true&multiline=true")
+ .to("mock:ce2");
+
+ from("direct:source")
+ .setBody()
+ .constant("test")
+ .setHeader(Exchange.HTTP_METHOD)
+ .constant("POST")
+ .toD("undertow:http://localhost:" + port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+ mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock1.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1");
+ mock1.expectedHeaderReceived(ce.attributes().source(), "CE0");
+ mock1.expectedBodiesReceived("test");
+ mock1.expectedMessageCount(1);
+
+ MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+ mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock2.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2");
+ mock2.expectedHeaderReceived(ce.attributes().source(), "CE5");
+ mock2.expectedBodiesReceived("test");
+ mock2.expectedMessageCount(1);
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID1");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "CE0");
+ }
+ );
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID2");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "CE5");
+ }
+ );
+
+ mock1.assertIsSatisfied();
+ mock2.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testConsumeEventContent(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.event(
+ Knative.EndpointKind.source,
+ "default",
+ "localhost",
+ port)
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:event/event1")
+ .convertBodyTo(String.class)
+ .to("log:ce1?showAll=true&multiline=true")
+ .to("mock:ce1");
+ from("knative:event/event2")
+ .convertBodyTo(String.class)
+ .to("log:ce2?showAll=true&multiline=true")
+ .to("mock:ce2");
+
+ from("direct:source")
+ .setBody()
+ .constant("test")
+ .setHeader(Exchange.HTTP_METHOD)
+ .constant("POST")
+ .toD("undertow:http://localhost:" + port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+ mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock1.expectedHeaderReceived(ce.attributes().type(), "event1");
+ mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1");
+ mock1.expectedHeaderReceived(ce.attributes().source(), "CE1");
+ mock1.expectedBodiesReceived("test");
+ mock1.expectedMessageCount(1);
+
+ MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+ mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock2.expectedHeaderReceived(ce.attributes().type(), "event2");
+ mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2");
+ mock2.expectedHeaderReceived(ce.attributes().source(), "CE2");
+ mock2.expectedBodiesReceived("test");
+ mock2.expectedMessageCount(1);
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "event1");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID1");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "CE1");
+ }
+ );
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "event2");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID2");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "CE2");
+ }
+ );
+
+ mock1.assertIsSatisfied();
+ mock2.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testReply(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "from",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )),
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.sink,
+ "to",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/from")
+ .convertBodyTo(String.class)
+ .setBody()
+ .constant("consumer");
+ from("direct:source")
+ .to("knative://endpoint/to")
+ .log("${body}")
+ .to("mock:to");
+ }
+ });
+
+ MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
+ mock.expectedBodiesReceived("consumer");
+ mock.expectedMessageCount(1);
+
+ context.start();
+ context.createProducerTemplate().sendBody("direct:source", "");
+
+ mock.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testInvokeServiceWithoutHost(CloudEvent ce) throws Exception {
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.sink,
+ "test",
+ "",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ ));
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .to("knative:endpoint/test")
+ .to("mock:start");
+ });
+
+ context.start();
+
+ Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(""));
+ assertThat(exchange.isFailed()).isTrue();
+ assertThat(exchange.getException()).isInstanceOf(CamelException.class);
+ assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed because host is not defined");
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testInvokeNotExistingEndpoint(CloudEvent ce) throws Exception {
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.sink,
+ "test",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ ));
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .to("knative:endpoint/test")
+ .to("mock:start");
+ });
+
+ context.start();
+
+ Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(""));
+ assertThat(exchange.isFailed()).isTrue();
+ assertThat(exchange.getException()).isInstanceOf(CamelException.class);
+ assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking http://localhost:" + port + "/");
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testRemoveConsumer(CloudEvent ce) throws Exception {
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep1",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
+ )
+ ),
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep2",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
+ )
+ )
+ ));
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("knative:endpoint/ep1")
+ .routeId("r1")
+ .setBody().simple("${routeId}");
+ b.from("knative:endpoint/ep2")
+ .routeId("r2")
+ .setBody().simple("${routeId}");
+ });
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .setHeader("h").body()
+ .toF("undertow:http://localhost:%d", port);
+ });
+
+ context.start();
+
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
+
+ context.getRouteController().stopRoute("r2");
+
+ assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
+ assertThat(e.isFailed()).isTrue();
+ assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testAddConsumer(CloudEvent ce) throws Exception {
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep1",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
+ )
+ ),
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "ep2",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
+ )
+ )
+ ));
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("knative:endpoint/ep1")
+ .routeId("r1")
+ .setBody().simple("${routeId}");
+ });
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .setHeader("h").body()
+ .toF("undertow:http://localhost:%d", port);
+ });
+
+ context.start();
+
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
+ assertThat(e.isFailed()).isTrue();
+ assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+ });
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("knative:endpoint/ep2")
+ .routeId("r2")
+ .setBody().simple("${routeId}");
+ });
+
+ assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+ assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testInvokeEndpointWithError(CloudEvent ce) throws Exception {
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ ));
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .to("knative:endpoint/ep")
+ .to("mock:start");
+ b.fromF("undertow:http://0.0.0.0:%d", port)
+ .routeId("endpoint")
+ .process(e -> {
+ throw new RuntimeException("endpoint error");
+ });
+ });
+
+ context.start();
+
+ Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(""));
+ assertThat(exchange.isFailed()).isTrue();
+ assertThat(exchange.getException()).isInstanceOf(CamelException.class);
+ assertThat(exchange.getException()).hasMessageStartingWith("HTTP operation failed invoking");
+ assertThat(exchange.getException()).hasMessageContaining("with statusCode: 500, statusMessage: Internal Server Error");
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testEvents(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.event(
+ Knative.EndpointKind.sink,
+ "default",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )),
+ KnativeEnvironment.event(
+ Knative.EndpointKind.source,
+ "default",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setProtocol(Knative.Protocol.http);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:source")
+ .to("knative:event/myEvent");
+ fromF("knative:event/myEvent")
+ .to("mock:ce");
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock.expectedHeaderReceived(ce.attributes().type(), "myEvent");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id()));
+ mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
+
+ context.createProducerTemplate().sendBody("direct:source", "test");
+
+ mock.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testEventsWithTypeAndVersion(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.event(
+ Knative.EndpointKind.sink,
+ "default",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_KIND, "MyObject",
+ Knative.KNATIVE_API_VERSION, "v1"
+ )),
+ KnativeEnvironment.event(
+ Knative.EndpointKind.source,
+ "default",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_KIND, "MyOtherObject",
+ Knative.KNATIVE_API_VERSION, "v2"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setProtocol(Knative.Protocol.http);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:source")
+ .to("knative:event/myEvent?kind=MyObject&apiVersion=v1");
+
+ fromF("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
+ .to("mock:ce");
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock.expectedHeaderReceived(ce.attributes().type(), "myEvent");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id()));
+ mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
+
+ context.createProducerTemplate().sendBody("direct:source", "test");
+
+ mock.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testConsumeContentWithTypeAndVersion(CloudEvent ce) throws Exception {
+ KnativeEnvironment env = KnativeEnvironment.on(
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "myEndpoint",
+ "localhost",
+ port + 1,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_KIND, "MyObject",
+ Knative.KNATIVE_API_VERSION, "v1"
+ )),
+ KnativeEnvironment.endpoint(
+ Knative.EndpointKind.source,
+ "myEndpoint",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_KIND, "MyObject",
+ Knative.KNATIVE_API_VERSION, "v2"
+ ))
+ );
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(ce.version());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/myEndpoint?kind=MyObject&apiVersion=v2")
+ .to("mock:ce");
+ from("direct:source")
+ .toF("undertow:http://localhost:%d", port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
+ mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
+ mock.expectedHeaderReceived(ce.attributes().id(), "myEventID");
+ mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+ mock.expectedBodiesReceived("test");
+ mock.expectedMessageCount(1);
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain");
+ e.getMessage().setHeader(ce.attributes().spec(), ce.version());
+ e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
+ e.getMessage().setHeader(ce.attributes().id(), "myEventID");
+ e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getMessage().setHeader(ce.attributes().source(), "/somewhere");
+ e.getMessage().setBody("test");
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+}
+
diff --git a/camel-knative/src/test/resources/log4j2-test.xml b/camel-knative/camel-knative-http/src/test/resources/log4j2-test.xml
similarity index 100%
rename from camel-knative/src/test/resources/log4j2-test.xml
rename to camel-knative/camel-knative-http/src/test/resources/log4j2-test.xml
diff --git a/camel-knative/pom.xml b/camel-knative/camel-knative/pom.xml
similarity index 95%
copy from camel-knative/pom.xml
copy to camel-knative/camel-knative/pom.xml
index ab73947..919e6a5 100644
--- a/camel-knative/pom.xml
+++ b/camel-knative/camel-knative/pom.xml
@@ -20,7 +20,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.camel.k</groupId>
- <artifactId>camel-k-runtime-parent</artifactId>
+ <artifactId>camel-knative-parent</artifactId>
<version>1.0.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -42,11 +42,6 @@
</dependency>
<dependency>
- <groupId>org.apache.camel.k</groupId>
- <artifactId>camel-knative-http</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-engine</artifactId>
<scope>provided</scope>
@@ -56,6 +51,10 @@
<artifactId>camel-cloud</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.k</groupId>
+ <artifactId>camel-knative-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -154,6 +153,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
similarity index 65%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 79641ac..90064c4 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -20,10 +20,17 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeTransport;
+import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.PropertiesHelper;
import org.apache.camel.util.StringHelper;
@@ -35,6 +42,14 @@ public class KnativeComponent extends DefaultComponent {
private KnativeConfiguration configuration;
private String environmentPath;
+ @Metadata(defaultValue = "http")
+ private Knative.Protocol protocol = Knative.Protocol.http;
+
+ @Metadata(defaultValue = "http")
+ private KnativeTransport transport;
+
+ private boolean managedTransport = true;
+
public KnativeComponent() {
this(null);
}
@@ -111,6 +126,66 @@ public class KnativeComponent extends DefaultComponent {
configuration.setTransportOptions(transportOptions);
}
+ public Knative.Protocol getProtocol() {
+ return protocol;
+ }
+
+ /**
+ * Protocol.
+ */
+ public KnativeComponent setProtocol(Knative.Protocol protocol) {
+ this.protocol = protocol;
+ return this;
+ }
+
+ public KnativeTransport getTransport() {
+ return transport;
+ }
+
+ /**
+ * The transport implementation.
+ */
+ public void setTransport(KnativeTransport transport) {
+ this.transport = transport;
+ }
+
+ // ************************
+ //
+ // Lifecycle
+ //
+ // ************************
+
+ @Override
+ protected void doInit() throws Exception {
+ if (transport == null) {
+ this.transport = getCamelContext().getRegistry().lookupByNameAndType(protocol.name(), KnativeTransport.class);
+
+ if (this.transport == null) {
+ this.transport = getCamelContext()
+ .adapt(ExtendedCamelContext.class)
+ .getFactoryFinder(Knative.KNATIVE_TRANSPORT_RESOURCE_PATH)
+ .newInstance(protocol.name(), KnativeTransport.class)
+ .orElseThrow(() -> new RuntimeException("Error creating knative transport for protocol: " + protocol.name()));
+
+ CamelContextAware.trySetCamelContext(transport, getCamelContext());
+ }
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (this.transport != null && managedTransport) {
+ ServiceHelper.startService(this.transport);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (this.transport != null && managedTransport) {
+ ServiceHelper.stopService(this.transport);
+ }
+ }
+
// ************************
//
//
@@ -119,10 +194,17 @@ public class KnativeComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ if (ObjectHelper.isEmpty(remaining)) {
+ throw new IllegalArgumentException("Expecting URI in the forof: 'knative:type/name', got '" + uri + "'");
+ }
+
final String type = StringHelper.before(remaining, "/");
- final String target = StringHelper.after(remaining, "/");
+ final String name = StringHelper.after(remaining, "/");
final KnativeConfiguration conf = getKnativeConfiguration();
+ conf.getFilters().putAll(
+ PropertiesHelper.extractProperties(parameters, "filter.", true)
+ );
conf.getTransportOptions().putAll(
PropertiesHelper.extractProperties(parameters, "transport.", true)
);
@@ -130,7 +212,11 @@ public class KnativeComponent extends DefaultComponent {
// set properties from the endpoint uri
PropertyBindingSupport.bindProperties(getCamelContext(), conf, parameters);
- return new KnativeEndpoint(uri, this, Knative.Type.valueOf(type), target, conf);
+ if (ObjectHelper.isEmpty(conf.getServiceName())) {
+ conf.setServiceName(name);
+ }
+
+ return new KnativeEndpoint(uri, this, Knative.Type.valueOf(type), name, this.transport, conf);
}
// ************************
@@ -145,6 +231,9 @@ public class KnativeComponent extends DefaultComponent {
if (conf.getTransportOptions() == null) {
conf.setTransportOptions(new HashMap<>());
}
+ if (conf.getFilters() == null) {
+ conf.setFilters(new HashMap<>());
+ }
if (conf.getEnvironment() == null) {
String envConfig = System.getenv(CONFIGURATION_ENV_VARIABLE);
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
similarity index 68%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 2358583..3d268de 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -19,23 +19,31 @@ package org.apache.camel.component.knative;
import java.util.Map;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.knative.spi.CloudEvents;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+@UriParams
public class KnativeConfiguration implements Cloneable {
@UriParam
private KnativeEnvironment environment;
-
+ @UriParam
+ private String serviceName;
@UriParam(defaultValue = "false")
private boolean jsonSerializationEnabled;
-
@UriParam(defaultValue = "0.2", enums = "0.1,0.2")
- private String cloudEventsSpecVersion = "0.2";
-
+ private String cloudEventsSpecVersion = CloudEvents.V02.version();
@UriParam(defaultValue = "org.apache.camel.event")
private String cloudEventsType = "org.apache.camel.event";
-
@UriParam(prefix = "transport.")
private Map<String, Object> transportOptions;
+ @UriParam(prefix = "filter.")
+ private Map<String, Object> filters;
+ @UriParam(label = "advanced")
+ private String apiVersion;
+ @UriParam(label = "advanced")
+ private String kind;
public KnativeConfiguration() {
}
@@ -57,11 +65,21 @@ public class KnativeConfiguration implements Cloneable {
this.environment = environment;
}
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ /**
+ * The name of the service to lookup from the {@link KnativeEnvironment}.
+ */
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
public boolean isJsonSerializationEnabled() {
return jsonSerializationEnabled;
}
-
/**
* Enables automatic serialization to JSON of the produced events.
*/
@@ -102,6 +120,41 @@ public class KnativeConfiguration implements Cloneable {
this.transportOptions = transportOptions;
}
+ public Map<String, Object> getFilters() {
+ return filters;
+ }
+
+ /**
+ * Set the filters.
+ */
+ public void setFilters(Map<String, Object> filters) {
+ this.filters = filters;
+ }
+
+ public String getApiVersion() {
+ return apiVersion;
+ }
+
+ /**
+ * The version of the k8s resource referenced by the endpoint.
+ */
+ public KnativeConfiguration setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ return this;
+ }
+
+ public String getKind() {
+ return kind;
+ }
+
+ /**
+ * The type of the k8s resource referenced by the endpoint.
+ */
+ public KnativeConfiguration setKind(String kind) {
+ this.kind = kind;
+ return this;
+ }
+
// ************************
//
// Cloneable
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
similarity index 96%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
index 65a04b8..6573820 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.knative;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.knative.spi.Knative;
/**
* Converts objects prior to serializing them to external endpoints or channels
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
new file mode 100644
index 0000000..a71b954
--- /dev/null
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.knative.ce.CloudEventProcessor;
+import org.apache.camel.component.knative.ce.CloudEventProcessors;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeTransport;
+import org.apache.camel.processor.Pipeline;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.PropertyBindingSupport;
+
+
+@UriEndpoint(
+ firstVersion = "3.0.0",
+ scheme = "knative",
+ syntax = "knative:type/name",
+ title = "Knative",
+ label = "cloud,eventing")
+public class KnativeEndpoint extends DefaultEndpoint {
+ @UriPath(description = "The Knative type")
+ private final Knative.Type type;
+ @UriPath(description = "The Knative name")
+ private final String name;
+
+ @UriParam
+ private KnativeConfiguration configuration;
+
+ private final KnativeTransport transport;
+ private final CloudEventProcessor cloudEvent;
+
+ public KnativeEndpoint(String uri, KnativeComponent component, Knative.Type type, String name, KnativeTransport transport, KnativeConfiguration configuration) {
+ super(uri, component);
+
+ this.type = type;
+ this.name = name;
+ this.transport = transport;
+ this.configuration = configuration;
+ this.cloudEvent = CloudEventProcessors.fromSpecVersion(configuration.getCloudEventsSpecVersion());
+ }
+
+ @Override
+ public KnativeComponent getComponent() {
+ return (KnativeComponent) super.getComponent();
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.sink);
+ final Processor ceProcessor = cloudEvent.producer(this, service);
+ final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
+ final Producer producer = transport.createProducer(this, service);
+
+ PropertyBindingSupport.build()
+ .withCamelContext(getCamelContext())
+ .withProperties(configuration.getTransportOptions())
+ .withRemoveParameters(false)
+ .withTarget(producer)
+ .bind();
+
+ return new KnativeProducer(this, ceProcessor, ceConverter, producer);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.source);
+ final Processor ceProcessor = cloudEvent.consumer(this, service);
+ final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
+ final Consumer consumer = transport.createConsumer(this, service, pipeline);
+
+ PropertyBindingSupport.build()
+ .withCamelContext(getCamelContext())
+ .withProperties(configuration.getTransportOptions())
+ .withRemoveParameters(false)
+ .withTarget(consumer)
+ .bind();
+
+ configureConsumer(consumer);
+
+ return consumer;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public Knative.Type getType() {
+ return type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setConfiguration(KnativeConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public KnativeConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.EndpointKind endpointKind) {
+ String serviceName = configuration.getServiceName();
+
+ //
+ // look-up service definition by service name first then if not found try to look it up by using
+ // "default" as a service name. For channels and endpoints, the service name can be derived from
+ // the endpoint uri but for events it is not possible so default should always be there for events
+ // unless the service name is define as an endpoint option.
+ //
+ Optional<KnativeEnvironment.KnativeServiceDefinition> service = lookupServiceDefinition(serviceName, endpointKind);
+ if (!service.isPresent()) {
+ service = lookupServiceDefinition("default", endpointKind);
+ }
+ if (!service.isPresent()) {
+ throw new IllegalArgumentException(String.format("Unable to find a service definition for %s/%s/%s", type, serviceName, endpointKind));
+ }
+
+ Map<String, String> metadata = new HashMap<>();
+ metadata.putAll(service.get().getMetadata());
+
+ for (Map.Entry<String, Object> entry: configuration.getFilters().entrySet()) {
+ String key = entry.getKey();
+ Object val = entry.getValue();
+
+ if (val instanceof String) {
+ if (!key.startsWith(Knative.KNATIVE_FILTER_PREFIX)) {
+ key = Knative.KNATIVE_FILTER_PREFIX + key;
+ }
+
+ metadata.put(key, (String)val);
+ }
+ }
+
+ if (service.get().getType() == Knative.Type.event) {
+ metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName);
+ metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().attributes().type(), serviceName);
+ }
+
+ return new KnativeEnvironment.KnativeServiceDefinition(
+ service.get().getType(),
+ service.get().getName(),
+ service.get().getHost(),
+ service.get().getPort(),
+ metadata
+ );
+ }
+
+ Optional<KnativeEnvironment.KnativeServiceDefinition> lookupServiceDefinition(String name, Knative.EndpointKind endpointKind) {
+ return this.configuration.getEnvironment()
+ .lookup(this.type, name)
+ .filter(s -> {
+ final String type = s.getMetadata().get(Knative.CAMEL_ENDPOINT_KIND);
+ final String apiv = s.getMetadata().get(Knative.KNATIVE_API_VERSION);
+ final String kind = s.getMetadata().get(Knative.KNATIVE_KIND);
+
+ if (!Objects.equals(endpointKind.name(), type)) {
+ return false;
+ }
+ if (configuration.getApiVersion() != null && !Objects.equals(apiv, configuration.getApiVersion())) {
+ return false;
+ }
+ if (configuration.getKind() != null && !Objects.equals(kind, configuration.getKind())) {
+ return false;
+ }
+
+ return true;
+ })
+ .findFirst();
+ }
+}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeProducer.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeProducer.java
similarity index 99%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/KnativeProducer.java
rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeProducer.java
index ba027d6..f503a8e 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeProducer.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeProducer.java
@@ -75,5 +75,4 @@ public class KnativeProducer extends DefaultAsyncProducer {
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(processor);
}
-
}
diff --git a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessor.java
similarity index 54%
copy from camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
copy to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessor.java
index 3c08aba..4c76029 100644
--- a/camel-k-quarkus/camel-k-quarkus-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/DeploymentProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessor.java
@@ -14,17 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.k.quarkus.knative.deployment;
+package org.apache.camel.component.knative.ce;
-import io.quarkus.deployment.annotations.BuildProducer;
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
-import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.KnativeEndpoint;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
-public class DeploymentProcessor {
- @BuildStep
- void registerReflectiveClasses(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, KnativeEnvironment.class));
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, KnativeEnvironment.KnativeServiceDefinition.class));
- }
+public interface CloudEventProcessor {
+ CloudEvent cloudEvent();
+ Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service);
+ Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service);
}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
new file mode 100644
index 0000000..75f1654
--- /dev/null
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.ce;
+
+import java.util.Objects;
+
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.KnativeEndpoint;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+
+public enum CloudEventProcessors implements CloudEventProcessor {
+ V01(new CloudEventV01Processor()),
+ V02(new CloudEventV02Processor());
+
+ private final CloudEventProcessor instance;
+
+ CloudEventProcessors(CloudEventProcessor instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public CloudEvent cloudEvent() {
+ return instance.cloudEvent();
+ }
+
+ @Override
+ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
+ return instance.consumer(endpoint, service);
+ }
+
+ @Override
+ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
+ return instance.producer(endpoint, service);
+ }
+
+ public static CloudEventProcessor fromSpecVersion(String version) {
+ for (CloudEventProcessor processor: CloudEventProcessors.values()) {
+ if (Objects.equals(processor.cloudEvent().version(), version)) {
+ return processor;
+ }
+ }
+
+ throw new IllegalArgumentException("Unable to find an implementation fo CloudEvents spec: " + version);
+ }
+}
+
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java
similarity index 72%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java
rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java
index 6921e7c..3920835 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java
@@ -21,50 +21,62 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
-import java.util.function.Function;
+import java.util.Objects;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.component.knative.Knative;
import org.apache.camel.component.knative.KnativeEndpoint;
-import org.apache.camel.component.knative.KnativeEnvironment;
-import org.apache.camel.component.knative.KnativeSupport;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.CloudEvents;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.commons.lang3.StringUtils;
import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
-final class V01 {
- public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> {
- KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService();
- String uri = endpoint.getEndpointUri();
+final class CloudEventV01Processor implements CloudEventProcessor {
+ private final CloudEvent cloudEvent;
+ public CloudEventV01Processor() {
+ this.cloudEvent = CloudEvents.V01;
+ }
+
+ @Override
+ public CloudEvent cloudEvent() {
+ return cloudEvent;
+ }
+
+ @Override
+ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
return exchange -> {
String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
if (eventType == null) {
eventType = endpoint.getConfiguration().getCloudEventsType();
}
+
final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
final Map<String, Object> headers = exchange.getIn().getHeaders();
- headers.putIfAbsent("CE-CloudEventsVersion", "0.1");
- headers.putIfAbsent("CE-EventType", eventType);
- headers.putIfAbsent("CE-EventID", exchange.getExchangeId());
- headers.putIfAbsent("CE-EventTime", eventTime);
- headers.putIfAbsent("CE-Source", uri);
+ headers.putIfAbsent(cloudEvent.attributes().id(), exchange.getExchangeId());
+ headers.putIfAbsent(cloudEvent.attributes().source(), endpoint.getEndpointUri());
+ headers.putIfAbsent(cloudEvent.attributes().spec(), cloudEvent.version());
+ headers.putIfAbsent(cloudEvent.attributes().type(), eventType);
+ headers.putIfAbsent(cloudEvent.attributes().time(), eventTime);
headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
// Always remove host so it's always computed from the URL and not inherited from the exchange
headers.remove("Host");
};
- };
+ }
@SuppressWarnings("unchecked")
- public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> {
+ @Override
+ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
return exchange -> {
- if (!KnativeSupport.hasStructuredContent(exchange)) {
+ if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) {
//
// The event is not in the form of Structured Content Mode
// then leave it as it is.
@@ -99,8 +111,5 @@ final class V01 {
});
}
};
- };
-
- private V01() {
}
}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java
similarity index 71%
rename from camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java
rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java
index 857d87c..f41c52c 100644
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java
@@ -21,50 +21,62 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
-import java.util.function.Function;
+import java.util.Objects;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.component.knative.Knative;
import org.apache.camel.component.knative.KnativeEndpoint;
-import org.apache.camel.component.knative.KnativeEnvironment;
-import org.apache.camel.component.knative.KnativeSupport;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.CloudEvents;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.commons.lang3.StringUtils;
import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
-final class V02 {
- public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> {
- KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService();
- String uri = endpoint.getEndpointUri();
+final class CloudEventV02Processor implements CloudEventProcessor {
+ private final CloudEvent cloudEvent;
+ public CloudEventV02Processor() {
+ this.cloudEvent = CloudEvents.V02;
+ }
+
+ @Override
+ public CloudEvent cloudEvent() {
+ return cloudEvent;
+ }
+
+ @Override
+ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
return exchange -> {
String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
if (eventType == null) {
eventType = endpoint.getConfiguration().getCloudEventsType();
}
+
final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
final Map<String, Object> headers = exchange.getIn().getHeaders();
- headers.putIfAbsent("ce-specversion", "0.2");
- headers.putIfAbsent("ce-type", eventType);
- headers.putIfAbsent("ce-id", exchange.getExchangeId());
- headers.putIfAbsent("ce-time", eventTime);
- headers.putIfAbsent("ce-source", uri);
+ headers.putIfAbsent(cloudEvent.attributes().id(), exchange.getExchangeId());
+ headers.putIfAbsent(cloudEvent.attributes().source(), endpoint.getEndpointUri());
+ headers.putIfAbsent(cloudEvent.attributes().spec(), cloudEvent.version());
+ headers.putIfAbsent(cloudEvent.attributes().type(), eventType);
+ headers.putIfAbsent(cloudEvent.attributes().time(), eventTime);
headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
// Always remove host so it's always computed from the URL and not inherited from the exchange
headers.remove("Host");
};
- };
+ }
@SuppressWarnings("unchecked")
- public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> {
+ @Override
+ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
return exchange -> {
- if (!KnativeSupport.hasStructuredContent(exchange)) {
+ if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) {
//
// The event is not in the form of Structured Content Mode
// then leave it as it is.
@@ -82,7 +94,7 @@ final class V02 {
final Message message = exchange.getIn();
final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
- ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
+ ifNotEmpty(ce.remove("contenttype"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
//
@@ -99,8 +111,5 @@ final class V02 {
});
}
};
- };
-
- private V02() {
}
}
diff --git a/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
new file mode 100644
index 0000000..14df215
--- /dev/null
+++ b/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.knative.spi.Knative;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.knative.spi.KnativeEnvironment.mandatoryLoadFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KnativeComponentTest {
+
+ private CamelContext context;
+
+ // **************************
+ //
+ // Setup
+ //
+ // **************************
+
+ @BeforeEach
+ public void before() {
+ this.context = new DefaultCamelContext();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ if (this.context != null) {
+ this.context.stop();
+ }
+ }
+
+ // **************************
+ //
+ // Common Tests
+ //
+ // **************************
+
+ @Test
+ void testLoadEnvironment() throws Exception {
+ KnativeEnvironment env = mandatoryLoadFromResource(context, "classpath:/environment.json");
+
+ assertThat(env.stream()).hasSize(3);
+ assertThat(env.stream()).anyMatch(s -> s.getType() == Knative.Type.channel);
+ assertThat(env.stream()).anyMatch(s -> s.getType() == Knative.Type.endpoint);
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setEnvironment(env);
+
+ //
+ // Channels
+ //
+ {
+ KnativeEndpoint endpoint = context.getEndpoint("knative:channel/c1", KnativeEndpoint.class);
+ assertThat(endpoint.lookupServiceDefinition("c1", Knative.EndpointKind.source)).isPresent();
+ assertThat(endpoint.lookupServiceDefinition("e1", Knative.EndpointKind.source)).isNotPresent();
+ }
+
+ //
+ // Endpoints
+ //
+ {
+ KnativeEndpoint endpoint = context.getEndpoint("knative:endpoint/e1", KnativeEndpoint.class);
+ assertThat(endpoint.lookupServiceDefinition("e1", Knative.EndpointKind.source)).isPresent();
+ assertThat(endpoint.lookupServiceDefinition("c1", Knative.EndpointKind.source)).isNotPresent();
+ }
+ }
+}
diff --git a/camel-knative/src/test/resources/environment.json b/camel-knative/camel-knative/src/test/resources/environment.json
similarity index 69%
rename from camel-knative/src/test/resources/environment.json
rename to camel-knative/camel-knative/src/test/resources/environment.json
index 03c0dfe..0c6a571 100644
--- a/camel-knative/src/test/resources/environment.json
+++ b/camel-knative/camel-knative/src/test/resources/environment.json
@@ -2,36 +2,36 @@
"services": [
{
"type": "channel",
- "protocol": "http",
"name": "c1",
"host": "localhost",
"port": "8001",
"metadata": {
"service.path": "",
- "knative.event.type": ""
+ "knative.event.type": "",
+ "camel.endpoint.kind": "source"
}
},
{
"type": "endpoint",
- "protocol": "http",
"name": "e1",
"host": "localhost",
"port": "9001",
"metadata": {
"service.path": "",
- "knative.event.type": ""
+ "knative.event.type": "",
+ "camel.endpoint.kind": "source"
}
},
{
"type": "endpoint",
- "protocol": "http",
"name": "default",
"host": "0.0.0.0",
"port": "8080",
"metadata": {
"service.path": "",
- "knative.event.type": ""
+ "knative.event.type": "",
+ "camel.endpoint.kind": "source"
}
}
]
-}
\ No newline at end of file
+}
diff --git a/camel-knative-http/src/test/resources/log4j2-test.xml b/camel-knative/camel-knative/src/test/resources/log4j2-test.xml
similarity index 96%
rename from camel-knative-http/src/test/resources/log4j2-test.xml
rename to camel-knative/camel-knative/src/test/resources/log4j2-test.xml
index 82b517b..486a0f0 100644
--- a/camel-knative-http/src/test/resources/log4j2-test.xml
+++ b/camel-knative/camel-knative/src/test/resources/log4j2-test.xml
@@ -30,7 +30,7 @@
<!--
<AppenderRef ref="STDOUT"/>
-->
- <AppenderRef ref="NONE"/>
+ <AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
diff --git a/camel-knative/pom.xml b/camel-knative/pom.xml
index ab73947..715d932 100644
--- a/camel-knative/pom.xml
+++ b/camel-knative/pom.xml
@@ -24,189 +24,15 @@
<version>1.0.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
+ <packaging>pom</packaging>
- <artifactId>camel-knative</artifactId>
+ <artifactId>camel-knative-parent</artifactId>
- <dependencies>
- <!-- ****************************** -->
- <!-- -->
- <!-- RUNTIME -->
- <!-- -->
- <!-- ****************************** -->
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.camel.k</groupId>
- <artifactId>camel-knative-http</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-core-engine</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-cloud</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>spi-annotations</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>apt</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.datatype</groupId>
- <artifactId>jackson-datatype-jdk8</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- <version>${commons-collections4.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>${commons-lang.version}</version>
- </dependency>
-
- <!-- ****************************** -->
- <!-- -->
- <!-- TESTS -->
- <!-- -->
- <!-- ****************************** -->
-
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-test</artifactId>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-http</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-file</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-direct</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-mock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-log</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-properties</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-undertow</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- <version>${assertj.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>${log4j2.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>${log4j2.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-package-maven-plugin</artifactId>
- <version>${camel.version}</version>
- <configuration>
- <failFast>false</failFast>
- </configuration>
- <executions>
- <execution>
- <id>generate</id>
- <goals>
- <goal>prepare-components</goal>
- </goals>
- <phase>process-classes</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.jboss.jandex</groupId>
- <artifactId>jandex-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>make-index</id>
- <goals>
- <goal>jandex</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <modules>
+ <module>camel-knative-api</module>
+ <module>camel-knative</module>
+ <module>camel-knative-http</module>
+ </modules>
</project>
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
deleted file mode 100644
index 31b6c70..0000000
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Consumer;
-import org.apache.camel.DelegateEndpoint;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.cloud.ServiceDefinition;
-import org.apache.camel.component.knative.ce.CloudEventsProcessors;
-import org.apache.camel.processor.Pipeline;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriPath;
-import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.StringHelper;
-import org.apache.camel.util.URISupport;
-
-
-@UriEndpoint(
- firstVersion = "3.0.0",
- scheme = "knative",
- syntax = "knative:type/target",
- title = "Knative",
- label = "cloud,eventing")
-public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint {
- @UriPath(description = "The Knative type")
- private final Knative.Type type;
- @UriPath(description = "The Knative name")
- private final String name;
-
- private final KnativeConfiguration configuration;
- private final KnativeEnvironment environment;
- private final KnativeEnvironment.KnativeServiceDefinition service;
- private final Endpoint endpoint;
-
- public KnativeEndpoint(String uri, KnativeComponent component, Knative.Type targetType, String remaining, KnativeConfiguration configuration) {
- super(uri, component);
-
- this.type = targetType;
- this.name = remaining.indexOf('/') != -1 ? StringHelper.before(remaining, "/") : remaining;
- this.configuration = configuration;
- this.environment = this.configuration.getEnvironment();
- this.service = this.environment.lookupServiceOrDefault(targetType, remaining);
-
- switch (service.getProtocol()) {
- case http:
- case https:
- this.endpoint = http(component.getCamelContext(), service, configuration.getTransportOptions());
- break;
- default:
- throw new IllegalArgumentException("unsupported protocol: " + this.service.getProtocol());
- }
- }
-
- @Override
- protected void doStart() throws Exception {
- super.doStart();
- ServiceHelper.startService(endpoint);
- }
-
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopService(endpoint);
- super.doStop();
- }
-
- @Override
- public KnativeComponent getComponent() {
- return (KnativeComponent) super.getComponent();
- }
-
- @Override
- public Producer createProducer() throws Exception {
- final String version = configuration.getCloudEventsSpecVersion();
- final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).producerProcessor(this);
- final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
-
- return new KnativeProducer(this, ceProcessor, ceConverter, endpoint.createProducer());
- }
-
- @Override
- public Consumer createConsumer(Processor processor) throws Exception {
- final String version = configuration.getCloudEventsSpecVersion();
- final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).consumerProcessor(this);
- final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
- final Consumer consumer = endpoint.createConsumer(pipeline);
-
- configureConsumer(consumer);
-
- return consumer;
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
- @Override
- public Endpoint getEndpoint() {
- return this.endpoint;
- }
-
- public Knative.Type getType() {
- return type;
- }
-
- public String getName() {
- return name;
- }
-
- public KnativeConfiguration getConfiguration() {
- return configuration;
- }
-
- public KnativeEnvironment.KnativeServiceDefinition getService() {
- return service;
- }
-
- // *****************************
- //
- // Helpers
- //
- // *****************************
-
- private static Endpoint http(CamelContext context, ServiceDefinition definition, Map<String, Object> transportOptions) {
- try {
- String scheme = Knative.HTTP_COMPONENT;
- String host = definition.getHost();
- int port = definition.getPort();
-
- if (port == -1) {
- port = Knative.DEFAULT_HTTP_PORT;
- }
- if (ObjectHelper.isEmpty(host)) {
- String name = definition.getName();
- String zone = definition.getMetadata().get(Knative.SERVICE_META_ZONE);
-
- if (ObjectHelper.isNotEmpty(zone)) {
- try {
- zone = context.resolvePropertyPlaceholders(zone);
- } catch (IllegalArgumentException e) {
- zone = null;
- }
- }
- if (ObjectHelper.isNotEmpty(zone)) {
- name = name + "." + zone;
- }
-
- host = name;
- }
-
- ObjectHelper.notNull(host, Knative.SERVICE_META_HOST);
-
- String uri = String.format("%s:%s:%s", scheme, host, port);
- String path = definition.getMetadata().get(Knative.SERVICE_META_PATH);
- if (path != null) {
- if (!path.startsWith("/")) {
- uri += "/";
- }
-
- uri += path;
- }
-
- final String filterKey = definition.getMetadata().get(Knative.FILTER_HEADER_NAME);
- final String filterVal = definition.getMetadata().get(Knative.FILTER_HEADER_VALUE);
- final Map<String, Object> parameters = new HashMap<>();
-
- parameters.putAll(transportOptions);
-
- if (ObjectHelper.isNotEmpty(filterKey) && ObjectHelper.isNotEmpty(filterVal)) {
- parameters.put("filter." + filterKey, filterVal);
- }
-
- uri = URISupport.appendParametersToURI(uri, parameters);
-
- return context.getEndpoint(uri);
- } catch (Exception e) {
- throw RuntimeCamelException.wrapRuntimeCamelException(e);
- }
- }
-}
diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java b/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java
deleted file mode 100644
index 4656504..0000000
--- a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.ce;
-
-import java.util.function.Function;
-
-import org.apache.camel.Processor;
-import org.apache.camel.component.knative.KnativeEndpoint;
-
-public enum CloudEventsProcessors {
- v01("0.1", V01.PRODUCER, V01.CONSUMER),
- v02("0.2", V02.PRODUCER, V02.CONSUMER);
-
- private final String version;
- private final Function<KnativeEndpoint, Processor> producer;
- private final Function<KnativeEndpoint, Processor> consumer;
-
- CloudEventsProcessors(String version, Function<KnativeEndpoint, Processor> producer, Function<KnativeEndpoint, Processor> consumer) {
- this.version = version;
- this.producer = producer;
- this.consumer = consumer;
- }
-
- public String getVersion() {
- return version;
- }
-
- public Processor producerProcessor(KnativeEndpoint endpoint) {
- return this.producer.apply(endpoint);
- }
-
- public Processor consumerProcessor(KnativeEndpoint endpoint) {
- return this.consumer.apply(endpoint);
- }
-
- // **************************
- //
- // Helpers
- //
- // **************************
-
- public static CloudEventsProcessors forSpecversion(String version) {
- for (CloudEventsProcessors ce : CloudEventsProcessors.values()) {
- if (ce.version.equals(version)) {
- return ce;
- }
- }
-
- throw new IllegalArgumentException("Unable to find processors for spec version: " + version);
- }
-}
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
deleted file mode 100644
index e0312ac..0000000
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.knative.ce.CloudEventsProcessors;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.AvailablePortFinder;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-public class CloudEventsV01Test {
-
- private CamelContext context;
-
- // **************************
- //
- // Setup
- //
- // **************************
-
- @BeforeEach
- public void before() {
- this.context = new DefaultCamelContext();
- }
-
- @AfterEach
- public void after() throws Exception {
- if (this.context != null) {
- this.context.stop();
- }
- }
-
- // **************************
- //
- // Tests
- //
- // **************************
-
- @Test
- void testInvokeEndpoint() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.custom-event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:source")
- .to("knative:endpoint/myEndpoint");
-
- fromF("undertow:http://localhost:%d/a/path", port)
- .to("mock:ce");
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.custom-event");
- mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setBody("test");
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testProduceDefaultEventType() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/",
- Knative.CONTENT_TYPE, "text/plain"
- )),
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint2",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/2",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:source")
- .to("knative:endpoint/myEndpoint");
-
- from("direct:source2")
- .to("knative:endpoint/myEndpoint2?cloudEventsType=my.type");
-
- fromF("undertow:http://localhost:%d/", port)
- .to("mock:ce");
-
- fromF("undertow:http://localhost:%d/2", port)
- .to("mock:ce2");
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- mock2.expectedHeaderReceived("CE-EventType", "my.type");
- mock2.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
- mock2.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
- mock2.expectedBodiesReceived("test2");
- mock2.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setBody("test");
- }
- );
- context.createProducerTemplate().send(
- "direct:source2",
- e -> {
- e.getIn().setBody("test2");
- }
- );
-
- mock.assertIsSatisfied();
- mock2.assertIsSatisfied();
- }
-
- @Test
- void testConsumeStructuredContent() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/myEndpoint")
- .to("mock:ce");
-
- from("direct:source")
- .toF("undertow:http://localhost:%d/a/path", port);
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock.expectedHeaderReceived("CE-EventID", "myEventID");
- mock.expectedHeaderReceived("CE-Source", "/somewhere");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
- e.getIn().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
- "cloudEventsVersion", CloudEventsProcessors.v01.getVersion(),
- "eventType", "org.apache.camel.event",
- "eventID", "myEventID",
- "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
- "source", "/somewhere",
- "data", "test"
- )));
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testConsumeContent() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/myEndpoint")
- .to("mock:ce");
-
- from("direct:source")
- .toF("undertow:http://localhost:%d/a/path", port);
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock.expectedHeaderReceived("CE-EventID", "myEventID");
- mock.expectedHeaderReceived("CE-Source", "/somewhere");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain");
- e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
- e.getIn().setHeader("CE-EventID", "myEventID");
- e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("CE-Source", "/somewhere");
- e.getIn().setBody("test");
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testConsumeContentWithFilter() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "ep1",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain",
- Knative.FILTER_HEADER_NAME, "CE-Source",
- Knative.FILTER_HEADER_VALUE, "CE1"
- )),
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "ep2",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain",
- Knative.FILTER_HEADER_NAME, "CE-Source",
- Knative.FILTER_HEADER_VALUE, "CE2"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/ep1")
- .convertBodyTo(String.class)
- .to("log:ce1?showAll=true&multiline=true")
- .to("mock:ce1");
- from("knative:endpoint/ep2")
- .convertBodyTo(String.class)
- .to("log:ce2?showAll=true&multiline=true")
- .to("mock:ce2");
-
- from("direct:source")
- .setBody()
- .constant("test")
- .setHeader(Exchange.HTTP_METHOD)
- .constant("POST")
- .setHeader(Exchange.HTTP_QUERY)
- .simple("filter.headerName=CE-Source&filter.headerValue=${header.FilterVal}")
- .toD("undertow:http://localhost:" + port);
- }
- });
-
- context.start();
-
- MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
- mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock1.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
- mock1.expectedHeaderReceived("CE-Source", "CE1");
- mock1.expectedBodiesReceived("test");
- mock1.expectedMessageCount(1);
-
- MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
- mock2.expectedHeaderReceived("CE-Source", "CE2");
- mock2.expectedBodiesReceived("test");
- mock2.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader("FilterVal", "CE1");
- e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
- e.getIn().setHeader("CE-EventID", "myEventID1");
- e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("CE-Source", "CE1");
- }
- );
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader("FilterVal", "CE2");
- e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
- e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
- e.getIn().setHeader("CE-EventID", "myEventID2");
- e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("CE-Source", "CE2");
- }
- );
-
- mock1.assertIsSatisfied();
- mock2.assertIsSatisfied();
- }
-}
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
deleted file mode 100644
index c633c1e..0000000
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.knative.ce.CloudEventsProcessors;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.AvailablePortFinder;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-public class CloudEventsV02Test {
-
- private CamelContext context;
-
- // **************************
- //
- // Setup
- //
- // **************************
-
- @BeforeEach
- public void before() {
- this.context = new DefaultCamelContext();
- }
-
- @AfterEach
- public void after() throws Exception {
- if (this.context != null) {
- this.context.stop();
- }
- }
-
- // **************************
- //
- // Tests
- //
- // **************************
-
- @Test
- void testInvokeEndpoint() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.custom-event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:source")
- .to("knative:endpoint/myEndpoint");
-
- fromF("undertow:http://localhost:%d/a/path", port)
- .to("mock:ce");
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
- mock.expectedHeaderReceived("ce-type", "org.apache.camel.custom-event");
- mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setBody("test");
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testProduceDefaultEventType() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/",
- Knative.CONTENT_TYPE, "text/plain"
- )),
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint2",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/2",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:source")
- .to("knative:endpoint/myEndpoint");
-
- from("direct:source2")
- .to("knative:endpoint/myEndpoint2?cloudEventsType=my.type");
-
- fromF("undertow:http://localhost:%d/", port)
- .to("mock:ce");
-
- fromF("undertow:http://localhost:%d/2", port)
- .to("mock:ce2");
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
- mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
- mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
- mock2.expectedHeaderReceived("ce-type", "my.type");
- mock2.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
- mock2.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
- mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
- mock2.expectedBodiesReceived("test2");
- mock2.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setBody("test");
- }
- );
- context.createProducerTemplate().send(
- "direct:source2",
- e -> {
- e.getIn().setBody("test2");
- }
- );
-
- mock.assertIsSatisfied();
- mock2.assertIsSatisfied();
- }
-
- @Test
- void testConsumeStructuredContent() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/myEndpoint")
- .to("mock:ce");
-
- from("direct:source")
- .toF("undertow:http://localhost:%d/a/path", port);
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
- mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
- mock.expectedHeaderReceived("ce-id", "myEventID");
- mock.expectedHeaderReceived("ce-source", "/somewhere");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
- e.getIn().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
- "specversion", CloudEventsProcessors.v02.getVersion(),
- "type", "org.apache.camel.event",
- "id", "myEventID",
- "time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
- "source", "/somewhere",
- "data", "test"
- )));
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testConsumeContent() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/myEndpoint")
- .to("mock:ce");
-
- from("direct:source")
- .toF("undertow:http://localhost:%d/a/path", port);
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
- mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
- mock.expectedHeaderReceived("ce-id", "myEventID");
- mock.expectedHeaderReceived("ce-source", "/somewhere");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain");
- e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
- e.getIn().setHeader("ce-type", "org.apache.camel.event");
- e.getIn().setHeader("ce-id", "myEventID");
- e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("ce-source", "/somewhere");
- e.getIn().setBody("test");
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testConsumeContentWithFilter() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "ep1",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain",
- Knative.FILTER_HEADER_NAME, "ce-source",
- Knative.FILTER_HEADER_VALUE, "CE1"
- )),
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "ep2",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain",
- Knative.FILTER_HEADER_NAME, "ce-source",
- Knative.FILTER_HEADER_VALUE, "CE2"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/ep1")
- .convertBodyTo(String.class)
- .to("log:ce1?showAll=true&multiline=true")
- .to("mock:ce1");
- from("knative:endpoint/ep2")
- .convertBodyTo(String.class)
- .to("log:ce2?showAll=true&multiline=true")
- .to("mock:ce2");
-
- from("direct:source")
- .setBody()
- .constant("test")
- .setHeader(Exchange.HTTP_METHOD)
- .constant("POST")
- .setHeader(Exchange.HTTP_QUERY)
- .simple("filter.headerName=ce-source&filter.headerValue=${header.FilterVal}")
- .toD("undertow:http://localhost:" + port);
- }
- });
-
- context.start();
-
- MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
- mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
- mock1.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
- mock1.expectedHeaderReceived("ce-type", "org.apache.camel.event");
- mock1.expectedHeaderReceived("ce-id", "myEventID1");
- mock1.expectedHeaderReceived("ce-source", "CE1");
- mock1.expectedBodiesReceived("test");
- mock1.expectedMessageCount(1);
-
- MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
- mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
- mock2.expectedHeaderReceived("ce-type", "org.apache.camel.event");
- mock2.expectedHeaderReceived("ce-id", "myEventID2");
- mock2.expectedHeaderReceived("ce-source", "CE2");
- mock2.expectedBodiesReceived("test");
- mock2.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader("FilterVal", "CE1");
- e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
- e.getIn().setHeader("ce-type", "org.apache.camel.event");
- e.getIn().setHeader("ce-id", "myEventID1");
- e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("ce-source", "CE1");
- }
- );
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader("FilterVal", "CE2");
- e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
- e.getIn().setHeader("ce-type", "org.apache.camel.event");
- e.getIn().setHeader("ce-id", "myEventID2");
- e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("ce-source", "CE2");
- }
- );
-
- mock1.assertIsSatisfied();
- mock2.assertIsSatisfied();
- }
-}
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
deleted file mode 100644
index c06523b..0000000
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
+++ /dev/null
@@ -1,648 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Properties;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.knative.ce.CloudEventsProcessors;
-import org.apache.camel.component.knative.http.KnativeHttpEndpoint;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.properties.PropertiesComponent;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.AvailablePortFinder;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.apache.camel.component.knative.KnativeEnvironment.mandatoryLoadFromResource;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-public class KnativeComponentTest {
-
- private CamelContext context;
-
- // **************************
- //
- // Setup
- //
- // **************************
-
- @BeforeEach
- public void before() {
- this.context = new DefaultCamelContext();
- }
-
- @AfterEach
- public void after() throws Exception {
- if (this.context != null) {
- this.context.stop();
- }
- }
-
- // **************************
- //
- // Tests
- //
- // **************************
-
- @Test
- void testLoadEnvironment() throws Exception {
- KnativeEnvironment env = mandatoryLoadFromResource(context, "classpath:/environment.json");
-
- assertThat(env.stream()).hasSize(3);
- assertThat(env.stream()).anyMatch(s -> s.getType() == Knative.Type.channel);
- assertThat(env.stream()).anyMatch(s -> s.getType() == Knative.Type.endpoint);
-
- assertThat(env.lookupService(Knative.Type.channel, "c1")).isPresent();
- assertThat(env.lookupService(Knative.Type.channel, "e1")).isNotPresent();
- assertThat(env.lookupService(Knative.Type.endpoint, "e1")).isPresent();
- assertThat(env.lookupService(Knative.Type.endpoint, "c1")).isNotPresent();
-
- assertThat(env.lookupServiceOrDefault(Knative.Type.endpoint, "undefined"))
- .hasFieldOrPropertyWithValue("name", "default")
- .hasFieldOrPropertyWithValue("host", "0.0.0.0")
- .hasFieldOrPropertyWithValue("port", 8080);
-
- assertThat(env.lookupServiceOrDefault(Knative.Type.channel, "myChannel"))
- .hasFieldOrPropertyWithValue("name", "myChannel-channel")
- .hasFieldOrPropertyWithValue("host", "")
- .hasFieldOrPropertyWithValue("port", -1);
-
- assertThatThrownBy(() -> env.mandatoryLookupService(Knative.Type.endpoint, "unknown"))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Unable to find the service \"unknown\" with type \"endpoint\"");
-
- assertThatThrownBy(() -> env.mandatoryLookupService(Knative.Type.channel, "unknown"))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Unable to find the service \"unknown\" with type \"channel\"");
- }
-
- @Test
- void testCreateComponent() throws Exception {
- context.start();
-
- assertThat(context.getComponent("knative")).isNotNull();
- assertThat(context.getComponent("knative")).isInstanceOf(KnativeComponent.class);
- }
-
- @Test
- void testCreateEndpoint() throws Exception {
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "my-node",
- 9001,
- KnativeSupport.mapOf(Knative.SERVICE_META_PATH, "/a/path"))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setEnvironment(env);
-
- context.start();
-
- //
- // Endpoint with context path derived from service definition
- //
-
- KnativeEndpoint e1 = context.getEndpoint("knative:endpoint/myEndpoint", KnativeEndpoint.class);
-
- assertThat(e1).isNotNull();
- assertThat(e1.getService()).isNotNull();
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("name", "myEndpoint");
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("host", "my-node");
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("port", 9001);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://my-node:9001/a/path");
-
- //
- // Endpoint with context path overridden by endpoint uri
- //
-
- KnativeEndpoint e2 = context.getEndpoint("knative:endpoint/myEndpoint/another/path", KnativeEndpoint.class);
-
- assertThat(e2).isNotNull();
- assertThat(e2.getService()).isNotNull();
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("name", "myEndpoint");
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("host", "my-node");
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("port", 9001);
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("path", "/another/path");
- assertThat(e2.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
- assertThat(e2.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://my-node:9001/another/path");
- }
-
- @Test
- void testCreateEndpointWithComputedHost() throws Exception {
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "",
- -1,
- KnativeSupport.mapOf(Knative.SERVICE_META_PATH, "/a/path"))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setEnvironment(env);
-
- context.start();
-
- //
- // Endpoint with context path derived from service definition
- //
-
- KnativeEndpoint e1 = context.getEndpoint("knative:endpoint/myEndpoint", KnativeEndpoint.class);
-
- assertThat(e1).isNotNull();
- assertThat(e1.getService()).isNotNull();
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("name", "myEndpoint");
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint:80/a/path");
- }
-
- @Test
- void testCreateEndpointWithComputedHostAndNamespace() throws Exception {
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "",
- -1,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.SERVICE_META_ZONE, "myNamespace"))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setEnvironment(env);
-
- context.start();
-
- //
- // Endpoint with context path derived from service definition
- //
-
- KnativeEndpoint e1 = context.getEndpoint("knative:endpoint/myEndpoint", KnativeEndpoint.class);
-
- assertThat(e1).isNotNull();
- assertThat(e1.getService()).isNotNull();
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("name", "myEndpoint");
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint.myNamespace:80/a/path");
- }
-
- @Test
- void testCreateEndpointWithComputedHostAndNamespaceWithProperty() throws Exception {
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "",
- -1,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.SERVICE_META_ZONE, "{{myNamespaceKey}}"))
- ));
-
- Properties properties = new Properties();
- properties.setProperty("myNamespaceKey", "myNamespace");
-
- PropertiesComponent pc = context.getComponent("properties", PropertiesComponent.class);
- pc.setInitialProperties(properties);
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setEnvironment(env);
-
- context.start();
-
- //
- // Endpoint with context path derived from service definition
- //
-
- KnativeEndpoint e1 = context.getEndpoint("knative:endpoint/myEndpoint", KnativeEndpoint.class);
-
- assertThat(e1).isNotNull();
- assertThat(e1.getService()).isNotNull();
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("name", "myEndpoint");
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/a/path");
- assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint.myNamespace:80/a/path");
- }
-
- @Test
- void testCreateEndpointWithDefaults() throws Exception {
- KnativeEnvironment env = new KnativeEnvironment(Collections.emptyList());
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setEnvironment(env);
-
- context.start();
-
- //
- // Endpoint with context path derived from service definition
- //
-
- KnativeEndpoint e1 = context.getEndpoint("knative:endpoint/myEndpoint/my/path", KnativeEndpoint.class);
-
- assertThat(e1).isNotNull();
- assertThat(e1.getService()).isNotNull();
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("name", "myEndpoint");
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.endpoint);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
- assertThat(e1.getService()).hasFieldOrPropertyWithValue("path", "/my/path");
- assertThat(e1.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
- assertThat(e1.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myEndpoint:80/my/path");
-
- //
- // Endpoint with context path overridden by endpoint uri
- //
-
- KnativeEndpoint e2 = context.getEndpoint("knative:channel/myChannel/another/path", KnativeEndpoint.class);
-
- assertThat(e2).isNotNull();
- assertThat(e2.getService()).isNotNull();
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("name", "myChannel-channel");
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("type", Knative.Type.channel);
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("protocol", Knative.Protocol.http);
- assertThat(e2.getService()).hasFieldOrPropertyWithValue("path", "/another/path");
- assertThat(e2.getEndpoint()).isInstanceOf(KnativeHttpEndpoint.class);
- assertThat(e2.getEndpoint()).hasFieldOrPropertyWithValue("endpointUri", "knative-http://myChannel-channel:80/another/path");
-
-
- }
-
- @Test
- void testInvokeEndpoint() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:source")
- .to("knative:endpoint/myEndpoint");
-
- fromF("undertow:http://localhost:%d/a/path", port)
- .to("mock:ce");
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
- mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().sendBody("direct:source", "test");
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testConsumeStructuredContent() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/myEndpoint")
- .to("mock:ce");
-
- from("direct:source")
- .toF("undertow:http://localhost:%d/a/path", port);
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
- mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock.expectedHeaderReceived("CE-EventID", "myEventID");
- mock.expectedHeaderReceived("CE-Source", "/somewhere");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
- e.getIn().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
- "cloudEventsVersion", "0.1",
- "eventType", "org.apache.camel.event",
- "eventID", "myEventID",
- "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
- "source", "/somewhere",
- "contentType", "text/plain",
- "data", "test"
- )));
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testConsumeContent() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "myEndpoint",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.SERVICE_META_PATH, "/a/path",
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/myEndpoint")
- .to("mock:ce");
-
- from("direct:source")
- .toF("undertow:http://localhost:%d/a/path", port);
- }
- });
-
- context.start();
-
- MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
- mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock.expectedHeaderReceived("CE-EventID", "myEventID");
- mock.expectedHeaderReceived("CE-Source", "/somewhere");
- mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock.expectedBodiesReceived("test");
- mock.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain");
- e.getIn().setHeader("CE-CloudEventsVersion", "0.1");
- e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
- e.getIn().setHeader("CE-EventID", "myEventID");
- e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("CE-Source", "/somewhere");
- e.getIn().setBody("test");
- }
- );
-
- mock.assertIsSatisfied();
- }
-
- @Test
- void testConsumeContentWithFilter() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "ep1",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain",
- Knative.FILTER_HEADER_NAME, "CE-Source",
- Knative.FILTER_HEADER_VALUE, "CE1"
- )),
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "ep2",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain",
- Knative.FILTER_HEADER_NAME, "CE-Source",
- Knative.FILTER_HEADER_VALUE, "CE2"
- ))
- ));
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/ep1")
- .convertBodyTo(String.class)
- .to("log:ce1?showAll=true&multiline=true")
- .to("mock:ce1");
- from("knative:endpoint/ep2")
- .convertBodyTo(String.class)
- .to("log:ce2?showAll=true&multiline=true")
- .to("mock:ce2");
-
- from("direct:source")
- .setBody()
- .constant("test")
- .setHeader(Exchange.HTTP_METHOD)
- .constant("POST")
- .setHeader(Exchange.HTTP_QUERY)
- .simple("filter.CE-Source=${header.FilterVal}")
- .toD("undertow:http://localhost:" + port);
- }
- });
-
- context.start();
-
- MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
- mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock1.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
- mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
- mock1.expectedHeaderReceived("CE-Source", "CE1");
- mock1.expectedBodiesReceived("test");
- mock1.expectedMessageCount(1);
-
- MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
- mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
- mock2.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
- mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
- mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
- mock2.expectedHeaderReceived("CE-Source", "CE2");
- mock2.expectedBodiesReceived("test");
- mock2.expectedMessageCount(1);
-
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader("FilterVal", "CE1");
- e.getIn().setHeader("CE-CloudEventsVersion", "0.1");
- e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
- e.getIn().setHeader("CE-EventID", "myEventID1");
- e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("CE-Source", "CE1");
- }
- );
- context.createProducerTemplate().send(
- "direct:source",
- e -> {
- e.getIn().setHeader("FilterVal", "CE2");
- e.getIn().setHeader("CE-CloudEventsVersion", "0.1");
- e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
- e.getIn().setHeader("CE-EventID", "myEventID2");
- e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
- e.getIn().setHeader("CE-Source", "CE2");
- }
- );
-
- mock1.assertIsSatisfied();
- mock2.assertIsSatisfied();
- }
-
- @Test
- void testReply() throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
-
- KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "from",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- )),
- new KnativeEnvironment.KnativeServiceDefinition(
- Knative.Type.endpoint,
- Knative.Protocol.http,
- "to",
- "localhost",
- port,
- KnativeSupport.mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
- Knative.CONTENT_TYPE, "text/plain"
- ))
- )
- );
-
- KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
- component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
- component.setEnvironment(env);
-
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("knative:endpoint/from")
- .convertBodyTo(String.class)
- .setBody().constant("consumer");
- from("direct:source")
- .to("knative://endpoint/to")
- .log("${body}")
- .to("mock:to");
- }
- });
-
- MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
- mock.expectedBodiesReceived("consumer");
- mock.expectedMessageCount(1);
-
- context.start();
- context.createProducerTemplate().sendBody("direct:source", "");
-
- mock.assertIsSatisfied();
- }
-}
diff --git a/pom.xml b/pom.xml
index 71437f8..8ca046d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,7 +202,6 @@
<modules>
<module>tooling</module>
- <module>camel-knative-http</module>
<module>camel-knative</module>
<module>camel-k-runtime-core</module>
@@ -286,6 +285,11 @@
<!-- components -->
<dependency>
<groupId>org.apache.camel.k</groupId>
+ <artifactId>camel-knative-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.k</groupId>
<artifactId>camel-knative</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/tooling/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java b/tooling/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java
index 53d99a2..7fcb7f8 100644
--- a/tooling/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java
+++ b/tooling/camel-k-maven-plugin/src/main/java/org/apache/camel/k/tooling/maven/processors/CatalogProcessor3x.java
@@ -208,6 +208,8 @@ public class CatalogProcessor3x implements CatalogProcessor {
artifact.setVersion(project.getVersion());
artifact.createScheme("knative").setHttp(true);
artifact.addDependency("org.apache.camel", "camel-cloud");
+ artifact.addDependency("org.apache.camel.k", "camel-knative-api");
+ artifact.addDependency("org.apache.camel.k", "camel-knative-http");
artifacts.put(artifact.getArtifactId(), artifact);
}
@@ -257,6 +259,7 @@ public class CatalogProcessor3x implements CatalogProcessor {
artifact.addDependency("org.apache.camel", "camel-cloud");
artifact.addDependency("org.apache.camel.k", "camel-k-loader-yaml");
artifact.addDependency("org.apache.camel.k", "camel-k-loader-knative");
+ artifact.addDependency("org.apache.camel.k", "camel-knative-api");
artifact.addDependency("org.apache.camel.k", "camel-knative");
artifact.addDependency("org.apache.camel.k", "camel-knative-http");