You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/12/13 18:04:57 UTC

[camel-k-runtime] branch master updated (af7332c -> 5fd7e2f)

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git.


    from af7332c  Merge pull request #207 from lburgazzoli/route-discovery
     new 3ab1f05  chore(deprecate): json serialization no longer used
     new 4763127  fix https://github.com/apache/camel-k/issues/1119: do not return cloud events by default
     new e88731e  fix https://github.com/apache/camel-k/issues/1119: moving transport related stuff out of core
     new 5fd7e2f  fix https://github.com/apache/camel-k/issues/1119: removing unneeded wrapper for reply processor

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/knative/spi/KnativeTransport.java    |  4 ++
 ...ter.java => KnativeTransportConfiguration.java} | 22 +++++---
 .../component/knative/http/KnativeHttpSupport.java | 26 +++++++++-
 .../knative/http/KnativeHttpTransport.java         | 12 +++--
 .../component/knative/http/KnativeHttpTest.java    | 58 +++++++++++++++++++++-
 .../camel/component/knative/KnativeComponent.java  |  2 +
 .../component/knative/KnativeConfiguration.java    | 22 ++++++++
 .../knative/KnativeConversionProcessor.java        |  1 +
 .../camel/component/knative/KnativeEndpoint.java   | 23 ++++++---
 9 files changed, 152 insertions(+), 18 deletions(-)
 copy camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/{CloudEventTypeConverter.java => KnativeTransportConfiguration.java} (61%)


[camel-k-runtime] 03/04: fix https://github.com/apache/camel-k/issues/1119: moving transport related stuff out of core

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit e88731ec5d71d158d3d237a286dad8aada235660
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 17:47:03 2019 +0100

    fix https://github.com/apache/camel-k/issues/1119: moving transport related stuff out of core
---
 .../component/knative/spi/KnativeTransport.java    |  4 +++
 .../knative/spi/KnativeTransportConfiguration.java | 38 ++++++++++++++++++++++
 .../component/knative/http/KnativeHttpSupport.java | 26 ++++++++++++++-
 .../knative/http/KnativeHttpTransport.java         | 12 +++++--
 .../component/knative/KnativeConfiguration.java    |  2 +-
 .../camel/component/knative/KnativeEndpoint.java   | 17 +++++++---
 .../component/knative/KnativeReplyProcessor.java   |  8 ++---
 7 files changed, 91 insertions(+), 16 deletions(-)

diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
index e9936a8..6b0cb55 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
@@ -27,21 +27,25 @@ public interface KnativeTransport extends Service {
      * Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
      *
      * @param endpoint the endpoint for which the producer should be created
+     * @param configuration the general transport configuration
      * @param service the service definition containing information about how make reach the target service.
      * @return
      */
     Producer createProducer(
         Endpoint endpoint,
+        KnativeTransportConfiguration configuration,
         KnativeEnvironment.KnativeServiceDefinition service);
 
     /**
      * Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
      *
      * @param endpoint the endpoint for which the consumer should be created.
+     * @param configuration the general transport configuration
      * @param service the service definition containing information about how make the route reachable from knative.
      * @return
      */
     Consumer createConsumer(
         Endpoint endpoint,
+        KnativeTransportConfiguration configuration,
         KnativeEnvironment.KnativeServiceDefinition service, Processor processor);
 }
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java
new file mode 100644
index 0000000..fc94034
--- /dev/null
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.spi;
+
+public final class KnativeTransportConfiguration {
+
+    private final CloudEvent cloudEvent;
+
+    private final boolean removeCloudEventHeadersInReply;
+
+    public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply) {
+        this.cloudEvent = cloudEvent;
+        this.removeCloudEventHeadersInReply = removeCloudEventHeadersInReply;
+    }
+
+    public CloudEvent getCloudEvent() {
+        return cloudEvent;
+    }
+
+    public boolean isRemoveCloudEventHeadersInReply() {
+        return removeCloudEventHeadersInReply;
+    }
+
+}
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
index 112ad58..faeba6a 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -24,11 +24,16 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import io.vertx.core.http.HttpServerRequest;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.apache.camel.util.ObjectHelper;
 
-public final  class KnativeHttpSupport {
+public final class KnativeHttpSupport {
     private KnativeHttpSupport() {
     }
 
@@ -94,4 +99,23 @@ public final  class KnativeHttpSupport {
             return true;
         };
     }
+
+    /**
+     * Removes cloud event headers at the end of the processing.
+     */
+    public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) {
+        return new DelegateAsyncProcessor(delegate) {
+            @Override
+            public boolean process(Exchange exchange, AsyncCallback callback) {
+                return processor.process(exchange, doneSync -> {
+                    // remove CloudEvent headers
+                    for (CloudEvent.Attribute attr : ce.attributes()) {
+                        exchange.getMessage().removeHeader(attr.http());
+                    }
+                    callback.done(doneSync);
+                });
+            }
+        };
+    }
+
 }
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
index 0f255a5..5850a53 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
@@ -35,6 +35,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.component.knative.spi.KnativeTransport;
+import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -198,12 +199,17 @@ public class KnativeHttpTransport extends ServiceSupport implements CamelContext
     // *****************************
 
     @Override
-    public Producer createProducer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
+    public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) {
         return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions);
     }
 
     @Override
-    public Consumer createConsumer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
-        return new KnativeHttpConsumer(this, endpoint, service, processor);
+    public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
+        Processor next = processor;
+        if (config.isRemoveCloudEventHeadersInReply()) {
+            next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent());
+        }
+        return new KnativeHttpConsumer(this, endpoint, service, next);
     }
+
 }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 6678054..1057782 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -48,7 +48,7 @@ public class KnativeConfiguration implements Cloneable {
     @UriParam(label = "advanced")
     private String kind;
     @UriParam(label = "consumer", defaultValue = "false")
-    private boolean replyWithCloudEvent = false;
+    private boolean replyWithCloudEvent;
 
     public KnativeConfiguration() {
     }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index e713f36..37f936b 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.camel.component.knative.ce.CloudEventProcessors;
 import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
@@ -36,7 +37,6 @@ import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.support.PropertyBindingSupport;
 
-
 /**
  * This component allows to interact with KNative events.
  */
@@ -76,7 +76,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
         final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.sink);
         final Processor ceProcessor = cloudEvent.producer(this, service);
         final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
-        final Producer producer = getComponent().getTransport().createProducer(this, service);
+        final Producer producer = getComponent().getTransport().createProducer(this, createTransportConfiguration(), service);
 
         PropertyBindingSupport.build()
             .withCamelContext(getCamelContext())
@@ -94,7 +94,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
         final Processor ceProcessor = cloudEvent.consumer(this, service);
         final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
         final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
-        final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
+        final Consumer consumer = getComponent().getTransport().createConsumer(this, createTransportConfiguration(), service, pipeline);
 
         PropertyBindingSupport.build()
             .withCamelContext(getCamelContext())
@@ -149,7 +149,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
         Map<String, String> metadata = new HashMap<>();
         metadata.putAll(service.get().getMetadata());
 
-        for (Map.Entry<String, Object> entry: configuration.getFilters().entrySet()) {
+        for (Map.Entry<String, Object> entry : configuration.getFilters().entrySet()) {
             String key = entry.getKey();
             Object val = entry.getValue();
 
@@ -162,7 +162,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
             }
         }
 
-        for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) {
+        for (Map.Entry<String, Object> entry : configuration.getCeOverride().entrySet()) {
             String key = entry.getKey();
             Object val = entry.getValue();
 
@@ -211,4 +211,11 @@ public class KnativeEndpoint extends DefaultEndpoint {
             })
             .findFirst();
     }
+
+    private KnativeTransportConfiguration createTransportConfiguration() {
+        return new KnativeTransportConfiguration(
+            this.cloudEvent.cloudEvent(),
+            !this.configuration.isReplyWithCloudEvent()
+        );
+    }
 }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
index e1ac4a5..30bc548 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.knative;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.knative.ce.CloudEventProcessor;
-import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 
@@ -32,7 +31,8 @@ public class KnativeReplyProcessor extends DelegateAsyncProcessor {
 
     private final CloudEventProcessor cloudEventProcessor;
 
-    public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
+    public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor,
+                                 boolean cloudEventEnabled) {
         super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);
 
         this.cloudEventEnabled = cloudEventEnabled;
@@ -46,10 +46,6 @@ public class KnativeReplyProcessor extends DelegateAsyncProcessor {
             return processor.process(exchange, callback);
         }
 
-        // remove CloudEvent headers
-        for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
-            exchange.getMessage().removeHeader(attr.http());
-        }
         callback.done(true);
         return true;
     }


[camel-k-runtime] 01/04: chore(deprecate): json serialization no longer used

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 3ab1f05e513d5e9857de8facc66ed1cca0f4714d
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 10:10:45 2019 +0100

    chore(deprecate): json serialization no longer used
---
 .../main/java/org/apache/camel/component/knative/KnativeComponent.java | 2 ++
 .../java/org/apache/camel/component/knative/KnativeConfiguration.java  | 3 +++
 .../org/apache/camel/component/knative/KnativeConversionProcessor.java | 1 +
 3 files changed, 6 insertions(+)

diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 8a2329a..987ca63 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -103,10 +103,12 @@ public class KnativeComponent extends DefaultComponent {
         configuration.setEnvironment(environment);
     }
 
+    @Deprecated
     public boolean isJsonSerializationEnabled() {
         return configuration.isJsonSerializationEnabled();
     }
 
+    @Deprecated
     public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
         configuration.setJsonSerializationEnabled(jsonSerializationEnabled);
     }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index eb93fb7..3911f85 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -31,6 +31,7 @@ public class KnativeConfiguration implements Cloneable {
     @UriParam
     private String serviceName;
     @UriParam(defaultValue = "false")
+    @Deprecated
     private boolean jsonSerializationEnabled;
     @UriParam(defaultValue = "0.3", enums = "0.1,0.2,0.3")
     private String cloudEventsSpecVersion = CloudEvents.V03.version();
@@ -78,6 +79,7 @@ public class KnativeConfiguration implements Cloneable {
         this.serviceName = serviceName;
     }
 
+    @Deprecated
     public boolean isJsonSerializationEnabled() {
         return jsonSerializationEnabled;
     }
@@ -85,6 +87,7 @@ public class KnativeConfiguration implements Cloneable {
     /**
      * Enables automatic serialization to JSON of the produced events.
      */
+    @Deprecated
     public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
         this.jsonSerializationEnabled = jsonSerializationEnabled;
     }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
index 6573820..433fa9e 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConversionProcessor.java
@@ -23,6 +23,7 @@ import org.apache.camel.component.knative.spi.Knative;
 /**
  * Converts objects prior to serializing them to external endpoints or channels
  */
+@Deprecated
 public class KnativeConversionProcessor implements Processor {
 
     private boolean enabled;


[camel-k-runtime] 02/04: fix https://github.com/apache/camel-k/issues/1119: do not return cloud events by default

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 4763127ae007a588f03ca8c9f8ba983be3dcb207
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 15:41:18 2019 +0100

    fix https://github.com/apache/camel-k/issues/1119: do not return cloud events by default
---
 .../component/knative/http/KnativeHttpTest.java    | 58 +++++++++++++++++++++-
 .../component/knative/KnativeConfiguration.java    | 19 +++++++
 .../camel/component/knative/KnativeEndpoint.java   |  3 +-
 .../component/knative/KnativeReplyProcessor.java   | 57 +++++++++++++++++++++
 4 files changed, 135 insertions(+), 2 deletions(-)

diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 0ff0014..2bc9f92 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -589,7 +589,9 @@ public class KnativeHttpTest {
                 from("knative:endpoint/from")
                     .convertBodyTo(String.class)
                     .setBody()
-                        .constant("consumer");
+                        .constant("consumer")
+                    .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
+                        .constant("custom");
                 from("direct:source")
                     .to("knative://endpoint/to")
                     .log("${body}")
@@ -599,6 +601,60 @@ public class KnativeHttpTest {
 
         MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
         mock.expectedBodiesReceived("consumer");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null);
+        mock.expectedMessageCount(1);
+
+        context.start();
+        context.createProducerTemplate().sendBody("direct:source", "");
+
+        mock.assertIsSatisfied();
+    }
+
+    @ParameterizedTest
+    @EnumSource(CloudEvents.class)
+    void testReplyCloudEventHeaders(CloudEvent ce) throws Exception {
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.source,
+                "from",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )),
+            endpoint(
+                Knative.EndpointKind.sink,
+                "to",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        );
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/from?replyWithCloudEvent=true")
+                    .convertBodyTo(String.class)
+                    .setBody()
+                        .constant("consumer")
+                    .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
+                        .constant("custom");
+                from("direct:source")
+                    .to("knative://endpoint/to")
+                    .log("${body}")
+                    .to("mock:to");
+            }
+        });
+
+        MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
+        mock.expectedBodiesReceived("consumer");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom");
         mock.expectedMessageCount(1);
 
         context.start();
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 3911f85..6678054 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -47,6 +47,8 @@ public class KnativeConfiguration implements Cloneable {
     private String apiVersion;
     @UriParam(label = "advanced")
     private String kind;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean replyWithCloudEvent = false;
 
     public KnativeConfiguration() {
     }
@@ -79,6 +81,23 @@ public class KnativeConfiguration implements Cloneable {
         this.serviceName = serviceName;
     }
 
+    public boolean isReplyWithCloudEvent() {
+        return replyWithCloudEvent;
+    }
+
+    /**
+     * Transforms the reply into a cloud event that will be processed by the caller.
+     *
+     * When listening to events from a Knative Broker, if this flag is enabled, replies will
+     * be published to the same Broker where the request comes from (beware that if you don't
+     * change the "type" of the received message, you may create a loop and receive your same reply).
+     *
+     * When this flag is disabled, CloudEvent headers are removed from the reply.
+     */
+    public void setReplyWithCloudEvent(boolean replyWithCloudEvent) {
+        this.replyWithCloudEvent = replyWithCloudEvent;
+    }
+
     @Deprecated
     public boolean isJsonSerializationEnabled() {
         return jsonSerializationEnabled;
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 9aaa45e..e713f36 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -92,7 +92,8 @@ public class KnativeEndpoint extends DefaultEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.source);
         final Processor ceProcessor = cloudEvent.consumer(this, service);
-        final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
+        final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
+        final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
         final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
 
         PropertyBindingSupport.build()
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
new file mode 100644
index 0000000..e1ac4a5
--- /dev/null
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.knative.ce.CloudEventProcessor;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+
+/**
+ * The KnativeReplyProcessor handles the processing of replies returned by the consumer.
+ */
+public class KnativeReplyProcessor extends DelegateAsyncProcessor {
+
+    private final boolean cloudEventEnabled;
+
+    private final CloudEventProcessor cloudEventProcessor;
+
+    public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
+        super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);
+
+        this.cloudEventEnabled = cloudEventEnabled;
+        this.cloudEventProcessor = cloudEventProcessor;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (cloudEventEnabled) {
+            // Delegate to CloudEvent processor
+            return processor.process(exchange, callback);
+        }
+
+        // remove CloudEvent headers
+        for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
+            exchange.getMessage().removeHeader(attr.http());
+        }
+        callback.done(true);
+        return true;
+    }
+
+}


[camel-k-runtime] 04/04: fix https://github.com/apache/camel-k/issues/1119: removing unneeded wrapper for reply processor

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 5fd7e2f4f59e740c98a899872b09ffcf8dc91da7
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 17:55:26 2019 +0100

    fix https://github.com/apache/camel-k/issues/1119: removing unneeded wrapper for reply processor
---
 .../camel/component/knative/KnativeEndpoint.java   |  5 +-
 .../component/knative/KnativeReplyProcessor.java   | 53 ----------------------
 2 files changed, 4 insertions(+), 54 deletions(-)

diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 37f936b..b27b3bb 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -92,7 +92,10 @@ public class KnativeEndpoint extends DefaultEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.source);
         final Processor ceProcessor = cloudEvent.consumer(this, service);
-        final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
+        Processor replyProcessor = null;
+        if (configuration.isReplyWithCloudEvent()) {
+            replyProcessor = cloudEvent.producer(this, service);
+        }
         final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
         final Consumer consumer = getComponent().getTransport().createConsumer(this, createTransportConfiguration(), service, pipeline);
 
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
deleted file mode 100644
index 30bc548..0000000
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.component.knative.ce.CloudEventProcessor;
-import org.apache.camel.component.knative.spi.KnativeEnvironment;
-import org.apache.camel.support.processor.DelegateAsyncProcessor;
-
-/**
- * The KnativeReplyProcessor handles the processing of replies returned by the consumer.
- */
-public class KnativeReplyProcessor extends DelegateAsyncProcessor {
-
-    private final boolean cloudEventEnabled;
-
-    private final CloudEventProcessor cloudEventProcessor;
-
-    public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor,
-                                 boolean cloudEventEnabled) {
-        super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);
-
-        this.cloudEventEnabled = cloudEventEnabled;
-        this.cloudEventProcessor = cloudEventProcessor;
-    }
-
-    @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        if (cloudEventEnabled) {
-            // Delegate to CloudEvent processor
-            return processor.process(exchange, callback);
-        }
-
-        callback.done(true);
-        return true;
-    }
-
-}