You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/12/13 18:04:59 UTC

[camel-k-runtime] 02/04: fix https://github.com/apache/camel-k/issues/1119: do not return cloud events by default

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 4763127ae007a588f03ca8c9f8ba983be3dcb207
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 15:41:18 2019 +0100

    fix https://github.com/apache/camel-k/issues/1119: do not return cloud events by default
---
 .../component/knative/http/KnativeHttpTest.java    | 58 +++++++++++++++++++++-
 .../component/knative/KnativeConfiguration.java    | 19 +++++++
 .../camel/component/knative/KnativeEndpoint.java   |  3 +-
 .../component/knative/KnativeReplyProcessor.java   | 57 +++++++++++++++++++++
 4 files changed, 135 insertions(+), 2 deletions(-)

diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 0ff0014..2bc9f92 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -589,7 +589,9 @@ public class KnativeHttpTest {
                 from("knative:endpoint/from")
                     .convertBodyTo(String.class)
                     .setBody()
-                        .constant("consumer");
+                        .constant("consumer")
+                    .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
+                        .constant("custom");
                 from("direct:source")
                     .to("knative://endpoint/to")
                     .log("${body}")
@@ -599,6 +601,60 @@ public class KnativeHttpTest {
 
         MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
         mock.expectedBodiesReceived("consumer");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null);
+        mock.expectedMessageCount(1);
+
+        context.start();
+        context.createProducerTemplate().sendBody("direct:source", "");
+
+        mock.assertIsSatisfied();
+    }
+
+    @ParameterizedTest
+    @EnumSource(CloudEvents.class)
+    void testReplyCloudEventHeaders(CloudEvent ce) throws Exception {
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.source,
+                "from",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )),
+            endpoint(
+                Knative.EndpointKind.sink,
+                "to",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        );
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/from?replyWithCloudEvent=true")
+                    .convertBodyTo(String.class)
+                    .setBody()
+                        .constant("consumer")
+                    .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
+                        .constant("custom");
+                from("direct:source")
+                    .to("knative://endpoint/to")
+                    .log("${body}")
+                    .to("mock:to");
+            }
+        });
+
+        MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
+        mock.expectedBodiesReceived("consumer");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom");
         mock.expectedMessageCount(1);
 
         context.start();
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 3911f85..6678054 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -47,6 +47,8 @@ public class KnativeConfiguration implements Cloneable {
     private String apiVersion;
     @UriParam(label = "advanced")
     private String kind;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean replyWithCloudEvent = false;
 
     public KnativeConfiguration() {
     }
@@ -79,6 +81,23 @@ public class KnativeConfiguration implements Cloneable {
         this.serviceName = serviceName;
     }
 
+    public boolean isReplyWithCloudEvent() {
+        return replyWithCloudEvent;
+    }
+
+    /**
+     * Transforms the reply into a cloud event that will be processed by the caller.
+     *
+     * When listening to events from a Knative Broker, if this flag is enabled, replies will
+     * be published to the same Broker where the request comes from (beware that if you don't
+     * change the "type" of the received message, you may create a loop and receive your same reply).
+     *
+     * When this flag is disabled, CloudEvent headers are removed from the reply.
+     */
+    public void setReplyWithCloudEvent(boolean replyWithCloudEvent) {
+        this.replyWithCloudEvent = replyWithCloudEvent;
+    }
+
     @Deprecated
     public boolean isJsonSerializationEnabled() {
         return jsonSerializationEnabled;
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 9aaa45e..e713f36 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -92,7 +92,8 @@ public class KnativeEndpoint extends DefaultEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.source);
         final Processor ceProcessor = cloudEvent.consumer(this, service);
-        final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
+        final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
+        final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
         final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
 
         PropertyBindingSupport.build()
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
new file mode 100644
index 0000000..e1ac4a5
--- /dev/null
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.knative.ce.CloudEventProcessor;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+
+/**
+ * The KnativeReplyProcessor handles the processing of replies returned by the consumer.
+ */
+public class KnativeReplyProcessor extends DelegateAsyncProcessor {
+
+    private final boolean cloudEventEnabled;
+
+    private final CloudEventProcessor cloudEventProcessor;
+
+    public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
+        super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);
+
+        this.cloudEventEnabled = cloudEventEnabled;
+        this.cloudEventProcessor = cloudEventProcessor;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (cloudEventEnabled) {
+            // Delegate to CloudEvent processor
+            return processor.process(exchange, callback);
+        }
+
+        // remove CloudEvent headers
+        for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
+            exchange.getMessage().removeHeader(attr.http());
+        }
+        callback.done(true);
+        return true;
+    }
+
+}