You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/10/25 15:47:42 UTC

[camel-k-runtime] branch master updated: Provide support for statically defined cloudevent headers #175

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd9bf7  Provide support for statically defined cloudevent headers #175
     new 0f244a6  Merge pull request #176 from lburgazzoli/github-175
1cd9bf7 is described below

commit 1cd9bf7213a3992d677877bb3da4c2b8ce730547
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 25 17:28:47 2019 +0200

    Provide support for statically defined cloudevent headers #175
---
 .../camel/component/knative/spi/Knative.java       |   1 +
 .../component/knative/spi/KnativeEnvironment.java  |   3 +-
 .../component/knative/http/KnativeHttpTest.java    | 186 +++++++++++++++++++++
 .../camel/component/knative/KnativeComponent.java  |   6 +
 .../component/knative/KnativeConfiguration.java    |  13 ++
 .../camel/component/knative/KnativeEndpoint.java   |  13 ++
 .../knative/ce/AbstractCloudEventProcessor.java    |   9 +
 7 files changed, 230 insertions(+), 1 deletion(-)

diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
index 65c2393..4d52932 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
@@ -24,6 +24,7 @@ public final class Knative {
 
     public static final String KNATIVE_TRANSPORT_RESOURCE_PATH = "META-INF/services/org/apache/camel/knative/transport/";
     public static final String KNATIVE_FILTER_PREFIX = "filter.";
+    public static final String KNATIVE_CE_OVERRIDE_PREFIX = "ce.override.";
     public static final String KNATIVE_TYPE = "knative.type";
     public static final String KNATIVE_EVENT_TYPE = "knative.event.type";
     public static final String KNATIVE_KIND = "knative.kind";
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
index f74fb5b..1702ae2 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java
@@ -80,7 +80,7 @@ public class KnativeEnvironment {
             // {
             //     "services": [
             //         {
-            //              "type": "channel|endpoint",
+            //              "type": "channel|endpoint|event",
             //              "name": "",
             //              "host": "",
             //              "port": "",
@@ -91,6 +91,7 @@ public class KnativeEnvironment {
             //                  "knative.kind": "",
             //                  "knative.apiVersion": "",
             //                  "camel.endpoint.kind": "source|sink",
+            //                  "ce.override.ce-type": "something",
             //              }
             //         },
             //     ]
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index e9f13df..9628838 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -1248,5 +1249,190 @@ public class KnativeHttpTest {
             server.stop();
         }
     }
+
+    @ParameterizedTest
+    @MethodSource("provideCloudEventsImplementations")
+    void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
+        final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+        final String typeHeaderVal = UUID.randomUUID().toString();
+        final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+        final String sourceHeaderVal = UUID.randomUUID().toString();
+
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "ep",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
+                    Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
+                )
+            )
+        );
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+        Undertow server = Undertow.builder()
+            .addHttpListener(port, "localhost")
+            .setHandler(se -> {
+                exchange.set(se);
+                latch.countDown();
+            })
+            .build();
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .to("knative:endpoint/ep");
+        });
+
+        context.start();
+        try {
+            server.start();
+            template.sendBody("direct:start", "");
+
+            latch.await();
+
+            HeaderMap headers = exchange.get().getRequestHeaders();
+
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        }  finally {
+            server.stop();
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideCloudEventsImplementations")
+    void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
+        final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+        final String typeHeaderVal = UUID.randomUUID().toString();
+        final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+        final String sourceHeaderVal = UUID.randomUUID().toString();
+
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "ep",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )
+            )
+        );
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+        Undertow server = Undertow.builder()
+            .addHttpListener(port, "localhost")
+            .setHandler(se -> {
+                exchange.set(se);
+                latch.countDown();
+            })
+            .build();
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .toF("knative:endpoint/ep?%s=%s&%s=%s",
+                    Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
+                    Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal);
+        });
+
+        context.start();
+        try {
+            server.start();
+            template.sendBody("direct:start", "");
+
+            latch.await();
+
+            HeaderMap headers = exchange.get().getRequestHeaders();
+
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        }  finally {
+            server.stop();
+        }
+    }
+
+
+
+    @ParameterizedTest
+    @MethodSource("provideCloudEventsImplementations")
+    void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
+        final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+        final String typeHeaderVal = UUID.randomUUID().toString();
+        final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+        final String sourceHeaderVal = UUID.randomUUID().toString();
+
+        KnativeComponent component = configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "ep",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )
+            )
+        );
+
+        component.getConfiguration().setCeOverride(KnativeSupport.mapOf(
+            Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
+            Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
+        ));
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+        Undertow server = Undertow.builder()
+            .addHttpListener(port, "localhost")
+            .setHandler(se -> {
+                exchange.set(se);
+                latch.countDown();
+            })
+            .build();
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .to("knative:endpoint/ep");
+        });
+
+        context.start();
+        try {
+            server.start();
+            template.sendBody("direct:start", "");
+
+            latch.await();
+
+            HeaderMap headers = exchange.get().getRequestHeaders();
+
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        }  finally {
+            server.stop();
+        }
+    }
 }
 
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 3cb34c2..6458c09 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -226,6 +226,9 @@ public class KnativeComponent extends DefaultComponent {
         conf.getTransportOptions().putAll(
             PropertiesHelper.extractProperties(parameters, "transport.", true)
         );
+        conf.getCeOverride().putAll(
+            PropertiesHelper.extractProperties(parameters, "ce.override.", true)
+        );
 
         // set properties from the endpoint uri
         PropertyBindingSupport.bindProperties(getCamelContext(), conf, parameters);
@@ -252,6 +255,9 @@ public class KnativeComponent extends DefaultComponent {
         if (conf.getFilters() == null) {
             conf.setFilters(new HashMap<>());
         }
+        if (conf.getCeOverride() == null) {
+            conf.setCeOverride(new HashMap<>());
+        }
 
         if (conf.getEnvironment() == null) {
             String envConfig = System.getenv(CONFIGURATION_ENV_VARIABLE);
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 277f33c..eb93fb7 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -40,6 +40,8 @@ public class KnativeConfiguration implements Cloneable {
     private Map<String, Object> transportOptions;
     @UriParam(prefix = "filter.")
     private Map<String, Object> filters;
+    @UriParam(prefix = "ce.override.")
+    private Map<String, Object> ceOverride;
     @UriParam(label = "advanced")
     private String apiVersion;
     @UriParam(label = "advanced")
@@ -131,6 +133,17 @@ public class KnativeConfiguration implements Cloneable {
         this.filters = filters;
     }
 
+    public Map<String, Object> getCeOverride() {
+        return ceOverride;
+    }
+
+    /**
+     * CloudEvent headers to override
+     */
+    public void setCeOverride(Map<String, Object> ceOverride) {
+        this.ceOverride = ceOverride;
+    }
+
     public String getApiVersion() {
         return apiVersion;
     }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index d06adf9..a9c75d5 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -158,6 +158,19 @@ public class KnativeEndpoint extends DefaultEndpoint {
             }
         }
 
+        for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) {
+            String key = entry.getKey();
+            Object val = entry.getValue();
+
+            if (val instanceof String) {
+                if (!key.startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) {
+                    key = Knative.KNATIVE_CE_OVERRIDE_PREFIX + key;
+                }
+
+                metadata.put(key, (String)val);
+            }
+        }
+
         if (service.get().getType() == Knative.Type.event) {
             metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName);
             metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), serviceName);
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index 6d1af26..4cd638a 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -91,6 +91,15 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor {
             headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), eventType);
             headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), eventTime);
             headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+            for (Map.Entry<String, String> entry: service.getMetadata().entrySet()) {
+                if (entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) {
+                    final String key = entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length());
+                    final String val = entry.getValue();
+
+                    headers.put(key, val);
+                }
+            }
         };
     }
 }