You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/12/13 18:04:59 UTC
[camel-k-runtime] 02/04: fix
https://github.com/apache/camel-k/issues/1119: do not return cloud events
by default
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 4763127ae007a588f03ca8c9f8ba983be3dcb207
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Dec 13 15:41:18 2019 +0100
fix https://github.com/apache/camel-k/issues/1119: do not return cloud events by default
---
.../component/knative/http/KnativeHttpTest.java | 58 +++++++++++++++++++++-
.../component/knative/KnativeConfiguration.java | 19 +++++++
.../camel/component/knative/KnativeEndpoint.java | 3 +-
.../component/knative/KnativeReplyProcessor.java | 57 +++++++++++++++++++++
4 files changed, 135 insertions(+), 2 deletions(-)
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 0ff0014..2bc9f92 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -589,7 +589,9 @@ public class KnativeHttpTest {
from("knative:endpoint/from")
.convertBodyTo(String.class)
.setBody()
- .constant("consumer");
+ .constant("consumer")
+ .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
+ .constant("custom");
from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
@@ -599,6 +601,60 @@ public class KnativeHttpTest {
MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
+ mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null);
+ mock.expectedMessageCount(1);
+
+ context.start();
+ context.createProducerTemplate().sendBody("direct:source", "");
+
+ mock.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @EnumSource(CloudEvents.class)
+ void testReplyCloudEventHeaders(CloudEvent ce) throws Exception {
+ configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.source,
+ "from",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )),
+ endpoint(
+ Knative.EndpointKind.sink,
+ "to",
+ "localhost",
+ port,
+ KnativeSupport.mapOf(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ ))
+ );
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("knative:endpoint/from?replyWithCloudEvent=true")
+ .convertBodyTo(String.class)
+ .setBody()
+ .constant("consumer")
+ .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())
+ .constant("custom");
+ from("direct:source")
+ .to("knative://endpoint/to")
+ .log("${body}")
+ .to("mock:to");
+ }
+ });
+
+ MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
+ mock.expectedBodiesReceived("consumer");
+ mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom");
mock.expectedMessageCount(1);
context.start();
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 3911f85..6678054 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -47,6 +47,8 @@ public class KnativeConfiguration implements Cloneable {
private String apiVersion;
@UriParam(label = "advanced")
private String kind;
+ @UriParam(label = "consumer", defaultValue = "false")
+ private boolean replyWithCloudEvent = false;
public KnativeConfiguration() {
}
@@ -79,6 +81,23 @@ public class KnativeConfiguration implements Cloneable {
this.serviceName = serviceName;
}
+ public boolean isReplyWithCloudEvent() {
+ return replyWithCloudEvent;
+ }
+
+ /**
+ * Transforms the reply into a cloud event that will be processed by the caller.
+ *
+ * When listening to events from a Knative Broker, if this flag is enabled, replies will
+ * be published to the same Broker where the request comes from (beware that if you don't
+ * change the "type" of the received message, you may create a loop and receive your same reply).
+ *
+ * When this flag is disabled, CloudEvent headers are removed from the reply.
+ */
+ public void setReplyWithCloudEvent(boolean replyWithCloudEvent) {
+ this.replyWithCloudEvent = replyWithCloudEvent;
+ }
+
@Deprecated
public boolean isJsonSerializationEnabled() {
return jsonSerializationEnabled;
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 9aaa45e..e713f36 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -92,7 +92,8 @@ public class KnativeEndpoint extends DefaultEndpoint {
public Consumer createConsumer(Processor processor) throws Exception {
final KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(Knative.EndpointKind.source);
final Processor ceProcessor = cloudEvent.consumer(this, service);
- final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
+ final Processor replyProcessor = new KnativeReplyProcessor(this, service, cloudEvent, configuration.isReplyWithCloudEvent());
+ final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor);
final Consumer consumer = getComponent().getTransport().createConsumer(this, service, pipeline);
PropertyBindingSupport.build()
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
new file mode 100644
index 0000000..e1ac4a5
--- /dev/null
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeReplyProcessor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.knative.ce.CloudEventProcessor;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.camel.support.processor.DelegateAsyncProcessor;
+
+/**
+ * The KnativeReplyProcessor handles the processing of replies returned by the consumer.
+ */
+public class KnativeReplyProcessor extends DelegateAsyncProcessor {
+
+ private final boolean cloudEventEnabled;
+
+ private final CloudEventProcessor cloudEventProcessor;
+
+ public KnativeReplyProcessor(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service, CloudEventProcessor cloudEventProcessor, boolean cloudEventEnabled) {
+ super(cloudEventEnabled ? cloudEventProcessor.producer(endpoint, service) : null);
+
+ this.cloudEventEnabled = cloudEventEnabled;
+ this.cloudEventProcessor = cloudEventProcessor;
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (cloudEventEnabled) {
+ // Delegate to CloudEvent processor
+ return processor.process(exchange, callback);
+ }
+
+ // remove CloudEvent headers
+ for (CloudEvent.Attribute attr : cloudEventProcessor.cloudEvent().attributes()) {
+ exchange.getMessage().removeHeader(attr.http());
+ }
+ callback.done(true);
+ return true;
+ }
+
+}