You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/12/13 18:05:00 UTC
[camel-k-runtime] 03/04: fix
https://github.com/apache/camel-k/issues/1119: moving transport related
stuff out of core
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit e88731ec5d71d158d3d237a286dad8aada235660
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 17:47:03 2019 +0100
fix https://github.com/apache/camel-k/issues/1119: moving transport related stuff out of core
---
.../component/knative/spi/KnativeTransport.java | 4 +++
.../knative/spi/KnativeTransportConfiguration.java | 38 ++++++++++++++++++++++
.../component/knative/http/KnativeHttpSupport.java | 26 ++++++++++++++-
.../knative/http/KnativeHttpTransport.java | 12 +++++--
.../component/knative/KnativeConfiguration.java | 2 +-
.../camel/component/knative/KnativeEndpoint.java | 17 +++++++---
.../component/knative/KnativeReplyProcessor.java | 8 ++---
7 files changed, 91 insertions(+), 16 deletions(-)
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
index e9936a8..6b0cb55 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransport.java
@@ -27,21 +27,25 @@ public interface KnativeTransport extends Service {
* Create a camel {@link Producer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the producer should be created
+ * @param configuration the general transport configuration
* @param service the service definition containing information about how make reach the target service.
* @return
*/
Producer createProducer(
Endpoint endpoint,
+ KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeServiceDefinition service);
/**
* Create a camel {@link Consumer} in place of the original endpoint for a specific protocol.
*
* @param endpoint the endpoint for which the consumer should be created.
+ * @param configuration the general transport configuration
* @param service the service definition containing information about how make the route reachable from knative.
* @return
*/
Consumer createConsumer(
Endpoint endpoint,
+ KnativeTransportConfiguration configuration,
KnativeEnvironment.KnativeServiceDefinition service, Processor processor);
}
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java
new file mode 100644
index 0000000..fc94034
--- /dev/null
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeTransportConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.spi;
+
+public final class KnativeTransportConfiguration {
+
+ private final CloudEvent cloudEvent;
+
+ private final boolean removeCloudEventHeadersInReply;
+
+ public KnativeTransportConfiguration(CloudEvent cloudEvent, boolean removeCloudEventHeadersInReply) {
+ this.cloudEvent = cloudEvent;
+ this.removeCloudEventHeadersInReply = removeCloudEventHeadersInReply;
+ }
+
+ public CloudEvent getCloudEvent() {
+ return cloudEvent;
+ }
+
+ public boolean isRemoveCloudEventHeadersInReply() {
+ return removeCloudEventHeadersInReply;
+ }
+
+}
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
index 112ad58..faeba6a 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -24,11 +24,16 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import io.vertx.core.http.HttpServerRequest;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.util.ObjectHelper;
-public final class KnativeHttpSupport {
+public final class KnativeHttpSupport {
private KnativeHttpSupport() {
}
@@ -94,4 +99,23 @@ public final class KnativeHttpSupport {
return true;
};
}
+
+ /**
+ * Removes cloud event headers at the end of the processing.
+ */
+ public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) {
+ return new DelegateAsyncProcessor(delegate) {
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ return processor.process(exchange, doneSync -> {
+ // remove CloudEvent headers
+ for (CloudEvent.Attribute attr : ce.attributes()) {
+ exchange.getMessage().removeHeader(attr.http());
+ }
+ callback.done(doneSync);
+ });
+ }
+ };
+ }
+
}
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
index 0f255a5..5850a53 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
@@ -35,6 +35,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeTransport;
+import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -198,12 +199,17 @@ public class KnativeHttpTransport extends ServiceSupport implements CamelContext
// *****************************
@Override
- public Producer createProducer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
+ public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service) {
return new KnativeHttpProducer(this, endpoint, service, vertx, vertxHttpClientOptions);
}
@Override
- public Consumer createConsumer(Endpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
- return new KnativeHttpConsumer(this, endpoint, service, processor);
+ public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
+ Processor next = processor;
+ if (config.isRemoveCloudEventHeadersInReply()) {
+ next = KnativeHttpSupport.withoutCloudEventHeaders(processor, config.getCloudEvent());
+ }
+ return new KnativeHttpConsumer(this, endpoint, service, next);
}
+
}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 6678054..1057782 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -48,7 +48,7 @@ public class KnativeConfiguration implements Cloneable {
@UriParam(label = "advanced")
private String kind;
@UriParam(label = "consumer", defaultValue = "false")
- private boolean replyWithCloudEvent = false;
+ private boolean replyWithCloudEvent;
public KnativeConfiguration() {
}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index e713f36..37f936b 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.camel.component.knative.ce.CloudEventProcessors;
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.component.knative.spi.KnativeTransportConfiguration;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
@@ -36,7 +37,6 @@ import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.PropertyBindingSupport;
-
/**
* This component allows to interact with KNative events.
*/
@@ -76,7 +76,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.sink);
final Processor ceProcessor = cloudEvent.producer(this, service);
final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
- final Producer producer = getComponent().getTransport().createProducer(this, service);
+ final Producer producer = getComponent().getTransport().createProducer(this, createTransportConfiguration(), service);
PropertyBindingSupport.build()
.withCamelContext(getCamelContext())
@@ -94,7 +94,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
final Processor ceProcessor = cloudEvent.consumer(this, service);
final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
- final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
+ final Consumer consumer = getComponent().getTransport().createConsumer(this, createTransportConfiguration(), service, pipeline);
PropertyBindingSupport.build()
.withCamelContext(getCamelContext())
@@ -149,7 +149,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
Map<String, String> metadata = new HashMap<>();
metadata.putAll(service.get().getMetadata());
- for (Map.Entry<String, Object> entry: configuration.getFilters().entrySet()) {
+ for (Map.Entry<String, Object> entry : configuration.getFilters().entrySet()) {
String key = entry.getKey();
Object val = entry.getValue();
@@ -162,7 +162,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
}
}
- for (Map.Entry<String, Object> entry: configuration.getCeOverride().entrySet()) {
+ for (Map.Entry<String, Object> entry : configuration.getCeOverride().entrySet()) {
String key = entry.getKey();
Object val = entry.getValue();
@@ -211,4 +211,11 @@ public class KnativeEndpoint extends DefaultEndpoint {
})
.findFirst();
}
+
+ private KnativeTransportConfiguration createTransportConfiguration() {
+ return new KnativeTransportConfiguration(
+ this.cloudEvent.cloudEvent(),
+ !this.configuration.isReplyWithCloudEvent()
+ );
+ }
}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
index e1ac4a5..30bc548 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.knative;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.knative.ce.CloudEventProcessor;
-import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -32,7 +31,8 @@ public class KnativeReplyProcessor extends DelegateAsyncProcessor {
private final CloudEventProcessor cloudEventProcessor;
- public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
+ public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor,
+ boolean cloudEventEnabled) {
super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);
this.cloudEventEnabled = cloudEventEnabled;
@@ -46,10 +46,6 @@ public class KnativeReplyProcessor extends DelegateAsyncProcessor {
return processor.process(exchange, callback);
}
- // remove CloudEvent headers
- for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
- exchange.getMessage().removeHeader(attr.http());
- }
callback.done(true);
return true;
}