You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/02/04 07:41:22 UTC
[camel-k] branch master updated: camel-knative: support for cloud
events specs v0.2 #376
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push:
new 3c3e250 camel-knative: support for cloud events specs v0.2 #376
3c3e250 is described below
commit 3c3e2502808c8b93f728c1d3ae5e94b8ad110741
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Feb 1 17:03:06 2019 +0100
camel-knative: support for cloud events specs v0.2 #376
---
.../camel/component/knative/KnativeComponent.java | 30 +-
.../component/knative/KnativeConfiguration.java | 22 ++
.../camel/component/knative/KnativeEndpoint.java | 87 +----
.../camel/component/knative/KnativeSupport.java | 13 -
.../knative/ce/CloudEventsProcessors.java | 65 ++++
.../org/apache/camel/component/knative/ce/V01.java | 102 ++++++
.../org/apache/camel/component/knative/ce/V02.java | 102 ++++++
.../component/knative/CloudEventsV01Test.java | 349 +++++++++++++++++++++
.../component/knative/CloudEventsV02Test.java | 349 +++++++++++++++++++++
9 files changed, 1019 insertions(+), 100 deletions(-)
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 264f644..60b7835 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -30,8 +30,6 @@ public class KnativeComponent extends DefaultComponent {
private final KnativeConfiguration configuration;
private String environmentPath;
- private boolean jsonSerializationEnabled;
-
public KnativeComponent() {
this(null);
}
@@ -74,6 +72,22 @@ public class KnativeComponent extends DefaultComponent {
configuration.setEnvironment(environment);
}
+ public boolean isJsonSerializationEnabled() {
+ return configuration.isJsonSerializationEnabled();
+ }
+
+ public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
+ configuration.setJsonSerializationEnabled(jsonSerializationEnabled);
+ }
+
+ public String getCloudEventsSpecVersion() {
+ return configuration.getCloudEventsSpecVersion();
+ }
+
+ public void setCloudEventsSpecVersion(String cloudEventSpecVersion) {
+ configuration.setCloudEventsSpecVersion(cloudEventSpecVersion);
+ }
+
// ************************
//
//
@@ -105,11 +119,11 @@ public class KnativeComponent extends DefaultComponent {
String envConfig = System.getenv(CONFIGURATION_ENV_VARIABLE);
if (environmentPath != null) {
conf.setEnvironment(
- KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), this.environmentPath)
+ KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), this.environmentPath)
);
} else if (envConfig != null) {
conf.setEnvironment(
- KnativeEnvironment.mandatoryLoadFromSerializedString(getCamelContext(), envConfig)
+ KnativeEnvironment.mandatoryLoadFromSerializedString(getCamelContext(), envConfig)
);
} else {
throw new IllegalStateException("Cannot load Knative configuration from file or env variable");
@@ -118,12 +132,4 @@ public class KnativeComponent extends DefaultComponent {
return conf;
}
-
- public boolean isJsonSerializationEnabled() {
- return jsonSerializationEnabled;
- }
-
- public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
- this.jsonSerializationEnabled = jsonSerializationEnabled;
- }
}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index cc3fa65..967f7dd 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -25,6 +25,12 @@ public class KnativeConfiguration implements Cloneable {
@Metadata(required = "true")
private KnativeEnvironment environment;
+ @UriParam(defaultValue = "false")
+ private boolean jsonSerializationEnabled;
+
+ @UriParam(defaultValue = "0.1", enums = "0.1,0.2")
+ private String cloudEventsSpecVersion = "0.1";
+
public KnativeConfiguration() {
}
@@ -45,6 +51,22 @@ public class KnativeConfiguration implements Cloneable {
this.environment = environment;
}
+ public boolean isJsonSerializationEnabled() {
+ return jsonSerializationEnabled;
+ }
+
+ public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
+ this.jsonSerializationEnabled = jsonSerializationEnabled;
+ }
+
+ public String getCloudEventsSpecVersion() {
+ return cloudEventsSpecVersion;
+ }
+
+ public void setCloudEventsSpecVersion(String cloudEventsSpecVersion) {
+ this.cloudEventsSpecVersion = cloudEventsSpecVersion;
+ }
+
// ************************
//
// Cloneable
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 89c7f3b..cceb218 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -16,15 +16,17 @@
*/
package org.apache.camel.component.knative;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.DelegateEndpoint;
import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.knative.ce.CloudEventsProcessors;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.spi.Metadata;
@@ -35,16 +37,6 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.StringHelper;
import org.apache.camel.util.URISupport;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.InputStream;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
@UriEndpoint(
@@ -107,73 +99,18 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint
@Override
public Producer createProducer() throws Exception {
- return new KnativeProducer(
- this,
- exchange -> {
- final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
- final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
- final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
- final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
- final Map<String, Object> headers = exchange.getIn().getHeaders();
-
- headers.putIfAbsent("CE-CloudEventsVersion", "0.1");
- headers.putIfAbsent("CE-EventType", eventType);
- headers.putIfAbsent("CE-EventID", exchange.getExchangeId());
- headers.putIfAbsent("CE-EventTime", eventTime);
- headers.putIfAbsent("CE-Source", getEndpointUri());
- headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
-
- // Always remove host so it's always computed from the URL and not inherited from the exchange
- headers.remove("Host");
- },
- new KnativeConversionProcessor(getComponent().isJsonSerializationEnabled()),
- endpoint.createProducer()
- );
+ final String version = configuration.getCloudEventsSpecVersion();
+ final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).producerProcessor(this);
+ final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
+
+ return new KnativeProducer(this, ceProcessor, ceConverter, endpoint.createProducer());
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- final Processor pipeline = Pipeline.newInstance(
- getCamelContext(),
- exchange -> {
- if (!KnativeSupport.hasStructuredContent(exchange)) {
- //
- // The event is not in the form of Structured Content Mode
- // then leave it as it is.
- //
- // Note that this is true for http binding only.
- //
- // More info:
- //
- // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
- //
- return;
- }
-
- try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
- final Message message = exchange.getIn();
- final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
-
- ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
- ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
-
- //
- // Map extensions to standard camel headers
- //
- ifNotEmpty(ce.remove("extensions"), val -> {
- if (val instanceof Map) {
- ((Map<String, Object>) val).forEach(message::setHeader);
- }
- });
-
- ce.forEach((key, val) -> {
- message.setHeader("CE-" + StringUtils.capitalize(key), val);
- });
- }
- },
- processor
- );
-
+ final String version = configuration.getCloudEventsSpecVersion();
+ final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).consumerProcessor(this);
+ final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
final Consumer consumer = endpoint.createConsumer(pipeline);
configureConsumer(consumer);
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
index f84ac46..9c6c049 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
@@ -16,15 +16,11 @@
*/
package org.apache.camel.component.knative;
-import java.io.UnsupportedEncodingException;
-import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.camel.Exchange;
-import org.apache.camel.util.CollectionHelper;
-import org.apache.camel.util.URISupport;
public final class KnativeSupport {
private KnativeSupport() {
@@ -47,13 +43,4 @@ public final class KnativeSupport {
return answer;
}
-
- public static String appendParametersToURI(String uri, String key, Object value, Object... keyVals)
- throws UnsupportedEncodingException, URISyntaxException {
-
- return URISupport.appendParametersToURI(
- uri,
- CollectionHelper.mapOf(key, value, keyVals)
- );
- }
}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java
new file mode 100644
index 0000000..49ebee4d
--- /dev/null
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.ce;
+
+import java.util.function.Function;
+
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.KnativeEndpoint;
+
+public enum CloudEventsProcessors {
+ v01("0.1", V01.PRODUCER, V01.CONSUMER),
+ v02("0.2", V02.PRODUCER, V02.CONSUMER);
+
+ private final String version;
+ private final Function<KnativeEndpoint, Processor> producer;
+ private final Function<KnativeEndpoint, Processor> consumer;
+
+ CloudEventsProcessors(String version, Function<KnativeEndpoint, Processor> producer, Function<KnativeEndpoint, Processor> consumer) {
+ this.version = version;
+ this.producer = producer;
+ this.consumer = consumer;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public Processor producerProcessor(KnativeEndpoint endpoint) {
+ return this.producer.apply(endpoint);
+ }
+
+ public Processor consumerProcessor(KnativeEndpoint endpoint) {
+ return this.consumer.apply(endpoint);
+ }
+
+ // **************************
+ //
+ // Helpers
+ //
+ // **************************
+
+ public static CloudEventsProcessors forSpecversion(String version) {
+ for (CloudEventsProcessors ce : CloudEventsProcessors.values()) {
+ if (ce.version.equals(version)) {
+ return ce;
+ }
+ }
+
+ throw new IllegalArgumentException("Unable to find processors for spec version: " + version);
+ }
+}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java
new file mode 100644
index 0000000..4505d8e
--- /dev/null
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.ce;
+
+import java.io.InputStream;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.Knative;
+import org.apache.camel.component.knative.KnativeEndpoint;
+import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.component.knative.KnativeSupport;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
+
+final class V01 {
+ private V01() {
+ }
+
+ public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> {
+ KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService();
+ String uri = endpoint.getEndpointUri();
+
+ return exchange -> {
+ final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
+ final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
+ final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
+ final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
+ final Map<String, Object> headers = exchange.getIn().getHeaders();
+
+ headers.putIfAbsent("CE-CloudEventsVersion", "0.1");
+ headers.putIfAbsent("CE-EventType", eventType);
+ headers.putIfAbsent("CE-EventID", exchange.getExchangeId());
+ headers.putIfAbsent("CE-EventTime", eventTime);
+ headers.putIfAbsent("CE-Source", uri);
+ headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+ // Always remove host so it's always computed from the URL and not inherited from the exchange
+ headers.remove("Host");
+ };
+ };
+
+ public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> {
+ return exchange -> {
+ if (!KnativeSupport.hasStructuredContent(exchange)) {
+ //
+ // The event is not in the form of Structured Content Mode
+ // then leave it as it is.
+ //
+ // Note that this is true for http binding only.
+ //
+ // More info:
+ //
+ // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
+ //
+ return;
+ }
+
+ try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
+ final Message message = exchange.getIn();
+ final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
+
+ ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
+ ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
+
+ //
+ // Map extensions to standard camel headers
+ //
+ ifNotEmpty(ce.remove("extensions"), val -> {
+ if (val instanceof Map) {
+ ((Map<String, Object>) val).forEach(message::setHeader);
+ }
+ });
+
+ ce.forEach((key, val) -> {
+ message.setHeader("CE-" + StringUtils.capitalize(key), val);
+ });
+ }
+ };
+ };
+}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java
new file mode 100644
index 0000000..321c502
--- /dev/null
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.ce;
+
+import java.io.InputStream;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.Knative;
+import org.apache.camel.component.knative.KnativeEndpoint;
+import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.component.knative.KnativeSupport;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
+
+final class V02 {
+ private V02() {
+ }
+
+ public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> {
+ KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService();
+ String uri = endpoint.getEndpointUri();
+
+ return exchange -> {
+ final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
+ final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
+ final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
+ final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
+ final Map<String, Object> headers = exchange.getIn().getHeaders();
+
+ headers.putIfAbsent("ce-specversion", "0.2");
+ headers.putIfAbsent("ce-type", eventType);
+ headers.putIfAbsent("ce-id", exchange.getExchangeId());
+ headers.putIfAbsent("ce-time", eventTime);
+ headers.putIfAbsent("ce-source", uri);
+ headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+ // Always remove host so it's always computed from the URL and not inherited from the exchange
+ headers.remove("Host");
+ };
+ };
+
+ public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> {
+ return exchange -> {
+ if (!KnativeSupport.hasStructuredContent(exchange)) {
+ //
+ // The event is not in the form of Structured Content Mode
+ // then leave it as it is.
+ //
+ // Note that this is true for http binding only.
+ //
+ // More info:
+ //
+ // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
+ //
+ return;
+ }
+
+ try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
+ final Message message = exchange.getIn();
+ final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
+
+ ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
+ ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
+
+ //
+ // Map extensions to standard camel headers
+ //
+ ifNotEmpty(ce.remove("extensions"), val -> {
+ if (val instanceof Map) {
+ ((Map<String, Object>) val).forEach(message::setHeader);
+ }
+ });
+
+ ce.forEach((key, val) -> {
+ message.setHeader("ce-" + StringUtils.lowerCase(key), val);
+ });
+ }
+ };
+ };
+}
diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
new file mode 100644
index 0000000..f5a9ecc
--- /dev/null
+++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.knative.ce.CloudEventsProcessors;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.util.CollectionHelper.mapOf;
+
+public class CloudEventsV01Test {
+
+ private CamelContext context;
+
+ // **************************
+ //
+ // Setup
+ //
+ // **************************
+
+ @BeforeEach
+ public void before() {
+ this.context = new DefaultCamelContext();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ if (this.context != null) {
+ this.context.stop();
+ }
+ }
+
+ // **************************
+ //
+ // Tests
+ //
+ // **************************
+
+ @Test
+ void testInvokeEndpoint() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "myEndpoint",
+ "localhost",
+ port,
+ mapOf(
+ ServiceDefinition.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:source")
+ .to("knative:endpoint/myEndpoint");
+
+ fromF("netty4-http:http://localhost:%d/a/path", port)
+ .to("mock:ce");
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+ mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
+ mock.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setBody("test");
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ void testConsumeStructuredContent() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "myEndpoint",
+ "localhost",
+ port,
+ mapOf(
+ ServiceDefinition.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/myEndpoint")
+ .to("mock:ce");
+
+ from("direct:source")
+ .toF("netty4-http:http://localhost:%d/a/path", port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+ mock.expectedHeaderReceived("CE-EventID", "myEventID");
+ mock.expectedHeaderReceived("CE-Source", "/somewhere");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+ mock.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+ e.getIn().setBody(new ObjectMapper().writeValueAsString(mapOf(
+ "cloudEventsVersion", CloudEventsProcessors.v01.getVersion(),
+ "eventType", "org.apache.camel.event",
+ "eventID", "myEventID",
+ "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+ "source", "/somewhere",
+ "data", "test"
+ )));
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ void testConsumeContent() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "myEndpoint",
+ "localhost",
+ port,
+ mapOf(
+ ServiceDefinition.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/myEndpoint")
+ .to("mock:ce");
+
+ from("direct:source")
+ .toF("http4://localhost:%d/a/path", port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+ mock.expectedHeaderReceived("CE-EventID", "myEventID");
+ mock.expectedHeaderReceived("CE-Source", "/somewhere");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+ mock.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain");
+ e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+ e.getIn().setHeader("CE-EventID", "myEventID");
+ e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getIn().setHeader("CE-Source", "/somewhere");
+ e.getIn().setBody("test");
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ void testConsumeContentWithFilter() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "ep1",
+ "localhost",
+ port,
+ mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.FILTER_HEADER_NAME, "CE-Source",
+ Knative.FILTER_HEADER_VALUE, "CE1"
+ )),
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "ep2",
+ "localhost",
+ port,
+ mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.FILTER_HEADER_NAME, "CE-Source",
+ Knative.FILTER_HEADER_VALUE, "CE2"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/ep1")
+ .convertBodyTo(String.class)
+ .to("log:ce1?showAll=true&multiline=true")
+ .to("mock:ce1");
+ from("knative:endpoint/ep2")
+ .convertBodyTo(String.class)
+ .to("log:ce2?showAll=true&multiline=true")
+ .to("mock:ce2");
+
+ from("direct:source")
+ .setBody()
+ .constant("test")
+ .setHeader(Exchange.HTTP_METHOD)
+ .constant("POST")
+ .setHeader(Exchange.HTTP_QUERY)
+ .simple("filter.headerName=CE-Source&filter.headerValue=${header.FilterVal}")
+ .toD("http4://localhost:" + port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+ mock1.expectedMessageCount(1);
+ mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+ mock1.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+ mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
+ mock1.expectedHeaderReceived("CE-Source", "CE1");
+ mock1.expectedBodiesReceived("test");
+
+ MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+ mock2.expectedMessageCount(1);
+ mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+ mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+ mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
+ mock2.expectedHeaderReceived("CE-Source", "CE2");
+ mock2.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader("FilterVal", "CE1");
+ e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+ e.getIn().setHeader("CE-EventID", "myEventID1");
+ e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getIn().setHeader("CE-Source", "CE1");
+ }
+ );
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader("FilterVal", "CE2");
+ e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+ e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+ e.getIn().setHeader("CE-EventID", "myEventID2");
+ e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getIn().setHeader("CE-Source", "CE2");
+ }
+ );
+
+ mock1.assertIsSatisfied();
+ mock2.assertIsSatisfied();
+ }
+}
diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
new file mode 100644
index 0000000..3c32915
--- /dev/null
+++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.knative.ce.CloudEventsProcessors;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.util.CollectionHelper.mapOf;
+
+public class CloudEventsV02Test {
+
+ private CamelContext context;
+
+ // **************************
+ //
+ // Setup
+ //
+ // **************************
+
+ @BeforeEach
+ public void before() {
+ this.context = new DefaultCamelContext();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ if (this.context != null) {
+ this.context.stop();
+ }
+ }
+
+ // **************************
+ //
+ // Tests
+ //
+ // **************************
+
+ @Test
+ void testInvokeEndpoint() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "myEndpoint",
+ "localhost",
+ port,
+ mapOf(
+ ServiceDefinition.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:source")
+ .to("knative:endpoint/myEndpoint");
+
+ fromF("netty4-http:http://localhost:%d/a/path", port)
+ .to("mock:ce");
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+ mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
+ mock.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setBody("test");
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ void testConsumeStructuredContent() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "myEndpoint",
+ "localhost",
+ port,
+ mapOf(
+ ServiceDefinition.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/myEndpoint")
+ .to("mock:ce");
+
+ from("direct:source")
+ .toF("netty4-http:http://localhost:%d/a/path", port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+ mock.expectedHeaderReceived("ce-id", "myEventID");
+ mock.expectedHeaderReceived("ce-source", "/somewhere");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+ mock.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+ e.getIn().setBody(new ObjectMapper().writeValueAsString(mapOf(
+ "specversion", CloudEventsProcessors.v02.getVersion(),
+ "type", "org.apache.camel.event",
+ "id", "myEventID",
+ "time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+ "source", "/somewhere",
+ "data", "test"
+ )));
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ void testConsumeContent() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "myEndpoint",
+ "localhost",
+ port,
+ mapOf(
+ ServiceDefinition.SERVICE_META_PATH, "/a/path",
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/myEndpoint")
+ .to("mock:ce");
+
+ from("direct:source")
+ .toF("http4://localhost:%d/a/path", port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+ mock.expectedHeaderReceived("ce-id", "myEventID");
+ mock.expectedHeaderReceived("ce-source", "/somewhere");
+ mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+ mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+ mock.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain");
+ e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ e.getIn().setHeader("ce-type", "org.apache.camel.event");
+ e.getIn().setHeader("ce-id", "myEventID");
+ e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getIn().setHeader("ce-source", "/somewhere");
+ e.getIn().setBody("test");
+ }
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ void testConsumeContentWithFilter() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+
+ KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "ep1",
+ "localhost",
+ port,
+ mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.FILTER_HEADER_NAME, "ce-source",
+ Knative.FILTER_HEADER_VALUE, "CE1"
+ )),
+ new KnativeEnvironment.KnativeServiceDefinition(
+ Knative.Type.endpoint,
+ Knative.Protocol.http,
+ "ep2",
+ "localhost",
+ port,
+ mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.FILTER_HEADER_NAME, "ce-source",
+ Knative.FILTER_HEADER_VALUE, "CE2"
+ ))
+ ));
+
+ KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+ component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+ component.setEnvironment(env);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/ep1")
+ .convertBodyTo(String.class)
+ .to("log:ce1?showAll=true&multiline=true")
+ .to("mock:ce1");
+ from("knative:endpoint/ep2")
+ .convertBodyTo(String.class)
+ .to("log:ce2?showAll=true&multiline=true")
+ .to("mock:ce2");
+
+ from("direct:source")
+ .setBody()
+ .constant("test")
+ .setHeader(Exchange.HTTP_METHOD)
+ .constant("POST")
+ .setHeader(Exchange.HTTP_QUERY)
+ .simple("filter.headerName=ce-source&filter.headerValue=${header.FilterVal}")
+ .toD("http4://localhost:" + port);
+ }
+ });
+
+ context.start();
+
+ MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+ mock1.expectedMessageCount(1);
+ mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+ mock1.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ mock1.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+ mock1.expectedHeaderReceived("ce-id", "myEventID1");
+ mock1.expectedHeaderReceived("ce-source", "CE1");
+ mock1.expectedBodiesReceived("test");
+
+ MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+ mock2.expectedMessageCount(1);
+ mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+ mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ mock2.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+ mock2.expectedHeaderReceived("ce-id", "myEventID2");
+ mock2.expectedHeaderReceived("ce-source", "CE2");
+ mock2.expectedBodiesReceived("test");
+
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader("FilterVal", "CE1");
+ e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ e.getIn().setHeader("ce-type", "org.apache.camel.event");
+ e.getIn().setHeader("ce-id", "myEventID1");
+ e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getIn().setHeader("ce-source", "CE1");
+ }
+ );
+ context.createProducerTemplate().send(
+ "direct:source",
+ e -> {
+ e.getIn().setHeader("FilterVal", "CE2");
+ e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
+ e.getIn().setHeader("ce-type", "org.apache.camel.event");
+ e.getIn().setHeader("ce-id", "myEventID2");
+ e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+ e.getIn().setHeader("ce-source", "CE2");
+ }
+ );
+
+ mock1.assertIsSatisfied();
+ mock2.assertIsSatisfied();
+ }
+}