You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/12/13 18:05:00 UTC

[camel-k-runtime] 03/04: fix https://github.com/apache/camel-k/issues/1119: moving transport related stuff out of core

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit e88731ec5d71d158d3d237a286dad8aada235660
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 17:47:03 2019 +0100

    fix https://github.com/apache/camel-k/issues/1119: moving transport related stuff out of core
---
 .../component/knative/spi/KnativeTransport.java    |  4 +++
 .../knative/spi/KnativeTransportConfiguration.java | 38 ++++++++++++++++++++++
 .../component/knative/http/KnativeHttpSupport.java | 26 ++++++++++++++-
 .../knative/http/KnativeHttpTransport.java         | 12 +++++--
 .../component/knative/KnativeConfiguration.java    |  2 +-
 .../camel/component/knative/KnativeEndpoint.java   | 17 +++++++---
 .../component/knative/KnativeReplyProcessor.java   |  8 ++---
 7 files changed, 91 insertions(+), 16 deletions(-)

diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
index e9936a8..6b0cb55 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
@@ -27,21 +27,25 @@ public interface KnativeTransport extends Service {
      * Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
      *
      * @param endpoint the endpoint for which the producer should be created
+     * @param configuration the general transport configuration
      * @param service the service definition containing information about how make reach the target service.
      * @return
      */
     Producer createProducer(
         Endpoint endpoint,
+        KnativeTransportConfiguration configuration,
         KnativeEnvironment.KnativeServiceDefinition service);
 
     /**
      * Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
      *
      * @param endpoint the endpoint for which the consumer should be created.
+     * @param configuration the general transport configuration
      * @param service the service definition containing information about how make the route reachable from knative.
      * @return
      */
     Consumer createConsumer(
         Endpoint endpoint,
+        KnativeTransportConfiguration configuration,
         KnativeEnvironment.KnativeServiceDefinition service, Processor processor);
 }
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java
new file mode 100644
index 0000000..fc94034
--- /dev/null
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.spi;
+
+public final class KnativeTransportConfiguration {
+
+    private final CloudEvent cloudEvent;
+
+    private final boolean removeCloudEventHeadersInReply;
+
+    public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply) {
+        this.cloudEvent = cloudEvent;
+        this.removeCloudEventHeadersInReply = removeCloudEventHeadersInReply;
+    }
+
+    public CloudEvent getCloudEvent() {
+        return cloudEvent;
+    }
+
+    public boolean isRemoveCloudEventHeadersInReply() {
+        return removeCloudEventHeadersInReply;
+    }
+
+}
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
index 112ad58..faeba6a 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -24,11 +24,16 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import io.vertx.core.http.HttpServerRequest;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.apache.camel.util.ObjectHelper;
 
-public final  class KnativeHttpSupport {
+public final class KnativeHttpSupport {
     private KnativeHttpSupport() {
     }
 
@@ -94,4 +99,23 @@ public final  class KnativeHttpSupport {
             return true;
         };
     }
+
+    /**
+     * Removes cloud event headers at the end of the processing.
+     */
+    public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) {
+        return new DelegateAsyncProcessor(delegate) {
+            @Override
+            public boolean process(Exchange exchange, AsyncCallback callback) {
+                return processor.process(exchange, doneSync -> {
+                    // remove CloudEvent headers
+                    for (CloudEvent.Attribute attr : ce.attributes()) {
+                        exchange.getMessage().removeHeader(attr.http());
+                    }
+                    callback.done(doneSync);
+                });
+            }
+        };
+    }
+
 }
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
index 0f255a5..5850a53 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
@@ -35,6 +35,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.component.knative.spi.KnativeTransport;
+import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -198,12 +199,17 @@ public class KnativeHttpTransport extends ServiceSupport implements CamelContext
     // *****************************
 
     @Override
-    public Producer createProducer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
+    public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) {
         return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions);
     }
 
     @Override
-    public Consumer createConsumer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
-        return new KnativeHttpConsumer(this, endpoint, service, processor);
+    public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
+        Processor next = processor;
+        if (config.isRemoveCloudEventHeadersInReply()) {
+            next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent());
+        }
+        return new KnativeHttpConsumer(this, endpoint, service, next);
     }
+
 }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 6678054..1057782 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -48,7 +48,7 @@ public class KnativeConfiguration implements Cloneable {
     @UriParam(label = "advanced")
     private String kind;
     @UriParam(label = "consumer", defaultValue = "false")
-    private boolean replyWithCloudEvent = false;
+    private boolean replyWithCloudEvent;
 
     public KnativeConfiguration() {
     }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index e713f36..37f936b 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.camel.component.knative.ce.CloudEventProcessors;
 import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
@@ -36,7 +37,6 @@ import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.support.PropertyBindingSupport;
 
-
 /**
  * This component allows to interact with KNative events.
  */
@@ -76,7 +76,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
         final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.sink);
         final Processor ceProcessor = cloudEvent.producer(this, service);
         final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
-        final Producer producer = getComponent().getTransport().createProducer(this, service);
+        final Producer producer = getComponent().getTransport().createProducer(this, createTransportConfiguration(), service);
 
         PropertyBindingSupport.build()
             .withCamelContext(getCamelContext())
@@ -94,7 +94,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
         final Processor ceProcessor = cloudEvent.consumer(this, service);
         final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
         final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
-        final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
+        final Consumer consumer = getComponent().getTransport().createConsumer(this, createTransportConfiguration(), service, pipeline);
 
         PropertyBindingSupport.build()
             .withCamelContext(getCamelContext())
@@ -149,7 +149,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
         Map<String, String> metadata = new HashMap<>();
         metadata.putAll(service.get().getMetadata());
 
-        for (Map.Entry<String, Object> entry: configuration.getFilters().entrySet()) {
+        for (Map.Entry<String, Object> entry : configuration.getFilters().entrySet()) {
             String key = entry.getKey();
             Object val = entry.getValue();
 
@@ -162,7 +162,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
             }
         }
 
-        for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) {
+        for (Map.Entry<String, Object> entry : configuration.getCeOverride().entrySet()) {
             String key = entry.getKey();
             Object val = entry.getValue();
 
@@ -211,4 +211,11 @@ public class KnativeEndpoint extends DefaultEndpoint {
             })
             .findFirst();
     }
+
+    private KnativeTransportConfiguration createTransportConfiguration() {
+        return new KnativeTransportConfiguration(
+            this.cloudEvent.cloudEvent(),
+            !this.configuration.isReplyWithCloudEvent()
+        );
+    }
 }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
index e1ac4a5..30bc548 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.knative;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.knative.ce.CloudEventProcessor;
-import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 
@@ -32,7 +31,8 @@ public class KnativeReplyProcessor extends DelegateAsyncProcessor {
 
     private final CloudEventProcessor cloudEventProcessor;
 
-    public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
+    public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor,
+                                 boolean cloudEventEnabled) {
         super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);
 
         this.cloudEventEnabled = cloudEventEnabled;
@@ -46,10 +46,6 @@ public class KnativeReplyProcessor extends DelegateAsyncProcessor {
             return processor.process(exchange, callback);
         }
 
-        // remove CloudEvent headers
-        for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
-            exchange.getMessage().removeHeader(attr.http());
-        }
         callback.done(true);
         return true;
     }