You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/10/25 15:47:42 UTC
[camel-k-runtime] branch master updated: Provide support for
statically defined cloudevent headers #175
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd9bf7 Provide support for statically defined cloudevent headers #175
new 0f244a6 Merge pull request #176 from lburgazzoli/github-175
1cd9bf7 is described below
commit 1cd9bf7213a3992d677877bb3da4c2b8ce730547
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 25 17:28:47 2019 +0200
Provide support for statically defined cloudevent headers #175
---
.../camel/component/knative/spi/Knative.java | 1 +
.../component/knative/spi/KnativeEnvironment.java | 3 +-
.../component/knative/http/KnativeHttpTest.java | 186 +++++++++++++++++++++
.../camel/component/knative/KnativeComponent.java | 6 +
.../component/knative/KnativeConfiguration.java | 13 ++
.../camel/component/knative/KnativeEndpoint.java | 13 ++
.../knative/ce/AbstractCloudEventProcessor.java | 9 +
7 files changed, 230 insertions(+), 1 deletion(-)
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
index 65c2393..4d52932 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
@@ -24,6 +24,7 @@ public final class Knative {
public static final String KNATIVE_TRANSPORT_RESOURCE_PATH = "META-INF/services/org/apache/camel/knative/transport/";
public static final String KNATIVE_FILTER_PREFIX = "filter.";
+ public static final String KNATIVE_CE_OVERRIDE_PREFIX = "ce.override.";
public static final String KNATIVE_TYPE = "knative.type";
public static final String KNATIVE_EVENT_TYPE = "knative.event.type";
public static final String KNATIVE_KIND = "knative.kind";
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
index f74fb5b..1702ae2 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
@@ -80,7 +80,7 @@ public class KnativeEnvironment {
// {
// "services": [
// {
- // "type": "channel|endpoint",
+ // "type": "channel|endpoint|event",
// "name": "",
// "host": "",
// "port": "",
@@ -91,6 +91,7 @@ public class KnativeEnvironment {
// "knative.kind": "",
// "knative.apiVersion": "",
// "camel.endpoint.kind": "source|sink",
+ // "ce.override.ce-type": "something",
// }
// },
// ]
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index e9f13df..9628838 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -1248,5 +1249,190 @@ public class KnativeHttpTest {
server.stop();
}
}
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
+ final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+ final String typeHeaderVal = UUID.randomUUID().toString();
+ final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+ final String sourceHeaderVal = UUID.randomUUID().toString();
+
+ configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain",
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
+ )
+ )
+ );
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+ Undertow server = Undertow.builder()
+ .addHttpListener(port, "localhost")
+ .setHandler(se -> {
+ exchange.set(se);
+ latch.countDown();
+ })
+ .build();
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .to("knative:endpoint/ep");
+ });
+
+ context.start();
+ try {
+ server.start();
+ template.sendBody("direct:start", "");
+
+ latch.await();
+
+ HeaderMap headers = exchange.get().getRequestHeaders();
+
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+ assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+ } finally {
+ server.stop();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
+ final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+ final String typeHeaderVal = UUID.randomUUID().toString();
+ final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+ final String sourceHeaderVal = UUID.randomUUID().toString();
+
+ configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ );
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+ Undertow server = Undertow.builder()
+ .addHttpListener(port, "localhost")
+ .setHandler(se -> {
+ exchange.set(se);
+ latch.countDown();
+ })
+ .build();
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .toF("knative:endpoint/ep?%s=%s&%s=%s",
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal);
+ });
+
+ context.start();
+ try {
+ server.start();
+ template.sendBody("direct:start", "");
+
+ latch.await();
+
+ HeaderMap headers = exchange.get().getRequestHeaders();
+
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+ assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+ } finally {
+ server.stop();
+ }
+ }
+
+
+
+ @ParameterizedTest
+ @MethodSource("provideCloudEventsImplementations")
+ void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
+ final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+ final String typeHeaderVal = UUID.randomUUID().toString();
+ final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+ final String sourceHeaderVal = UUID.randomUUID().toString();
+
+ KnativeComponent component = configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ );
+
+ component.getConfiguration().setCeOverride(KnativeSupport.mapOf(
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
+ Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
+ ));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+ Undertow server = Undertow.builder()
+ .addHttpListener(port, "localhost")
+ .setHandler(se -> {
+ exchange.set(se);
+ latch.countDown();
+ })
+ .build();
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .to("knative:endpoint/ep");
+ });
+
+ context.start();
+ try {
+ server.start();
+ template.sendBody("direct:start", "");
+
+ latch.await();
+
+ HeaderMap headers = exchange.get().getRequestHeaders();
+
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+ assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+ assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+ } finally {
+ server.stop();
+ }
+ }
}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 3cb34c2..6458c09 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -226,6 +226,9 @@ public class KnativeComponent extends DefaultComponent {
conf.getTransportOptions().putAll(
PropertiesHelper.extractProperties(parameters, "transport.", true)
);
+ conf.getCeOverride().putAll(
+ PropertiesHelper.extractProperties(parameters, "ce.override.", true)
+ );
// set properties from the endpoint uri
PropertyBindingSupport.bindProperties(getCamelContext(), conf, parameters);
@@ -252,6 +255,9 @@ public class KnativeComponent extends DefaultComponent {
if (conf.getFilters() == null) {
conf.setFilters(new HashMap<>());
}
+ if (conf.getCeOverride() == null) {
+ conf.setCeOverride(new HashMap<>());
+ }
if (conf.getEnvironment() == null) {
String envConfig = System.getenv(CONFIGURATION_ENV_VARIABLE);
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 277f33c..eb93fb7 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -40,6 +40,8 @@ public class KnativeConfiguration implements Cloneable {
private Map<String, Object> transportOptions;
@UriParam(prefix = "filter.")
private Map<String, Object> filters;
+ @UriParam(prefix = "ce.override.")
+ private Map<String, Object> ceOverride;
@UriParam(label = "advanced")
private String apiVersion;
@UriParam(label = "advanced")
@@ -131,6 +133,17 @@ public class KnativeConfiguration implements Cloneable {
this.filters = filters;
}
+ public Map<String, Object> getCeOverride() {
+ return ceOverride;
+ }
+
+ /**
+ * CloudEvent headers to override
+ */
+ public void setCeOverride(Map<String, Object> ceOverride) {
+ this.ceOverride = ceOverride;
+ }
+
public String getApiVersion() {
return apiVersion;
}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index d06adf9..a9c75d5 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -158,6 +158,19 @@ public class KnativeEndpoint extends DefaultEndpoint {
}
}
+ for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) {
+ String key = entry.getKey();
+ Object val = entry.getValue();
+
+ if (val instanceof String) {
+ if (!key.startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) {
+ key = Knative.KNATIVE_CE_OVERRIDE_PREFIX + key;
+ }
+
+ metadata.put(key, (String)val);
+ }
+ }
+
if (service.get().getType() == Knative.Type.event) {
metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName);
metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), serviceName);
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index 6d1af26..4cd638a 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -91,6 +91,15 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor {
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), eventType);
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), eventTime);
headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+ for (Map.Entry<String, String> entry: service.getMetadata().entrySet()) {
+ if (entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) {
+ final String key = entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length());
+ final String val = entry.getValue();
+
+ headers.put(key, val);
+ }
+ }
};
}
}