You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/07/17 11:19:02 UTC
[camel-k-runtime] branch master updated: camel-knative: don't
replace message when the producers completes #399
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push:
new b72dc22 camel-knative: don't replace message when the producers completes #399
b72dc22 is described below
commit b72dc22bd925e07c598ea512078b3f492b45534f
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Thu Jul 16 11:17:39 2020 +0200
camel-knative: don't replace message when the producers completes #399
---
.../knative/http/KnativeHttpConsumer.java | 5 +
.../knative/http/KnativeHttpProducer.java | 9 +-
.../component/knative/http/KnativeHttpSupport.java | 51 +--
.../knative/http/KnativeHttpTransport.java | 8 +-
.../component/knative/http/KnativeHttpServer.java | 33 +-
.../component/knative/http/KnativeHttpTest.java | 507 +++++++++++----------
.../knative/http/KnativeHttpTestSupport.java | 4 +
.../http/assertions/HttpServerRequestAssert.java | 66 +++
.../component/knative/KnativeConfiguration.java | 6 +-
9 files changed, 405 insertions(+), 284 deletions(-)
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index fc83036..fb60431 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -287,6 +287,11 @@ public class KnativeHttpConsumer extends DefaultConsumer {
}
}
}
+
+ KnativeHttpSupport.remapCloudEventHeaders(configuration.getCloudEvent(), message);
+ if (configuration.isRemoveCloudEventHeadersInReply()) {
+ KnativeHttpSupport.removeCloudEventHeaders(configuration.getCloudEvent(), message);
+ }
}
return response;
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
index 67045d1..ca23300 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
@@ -35,7 +35,6 @@ import org.apache.camel.Message;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
@@ -117,18 +116,20 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
.sendBuffer(Buffer.buffer(payload), response -> {
if (response.succeeded()) {
HttpResponse<Buffer> result = response.result();
+ Message answer = exchange.getMessage();
- Message answer = new DefaultMessage(exchange.getContext());
answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
for (Map.Entry<String, String> entry : result.headers().entries()) {
if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
- KnativeHttpSupport.appendHeader(answer.getHeaders(), entry.getKey(), entry.getValue());
+ answer.setHeader(entry.getKey(), entry.getValue());
}
}
if (result.body() != null) {
answer.setBody(result.body().getBytes());
+ } else {
+ answer.setBody(null);
}
if (result.statusCode() < 200 || result.statusCode() >= 300) {
@@ -143,8 +144,6 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
}
answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
-
- exchange.setMessage(answer);
} else if (response.failed()) {
String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(this.uri.get());
if (response.result() != null) {
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
index c74e0ab..67bb64f 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -24,14 +24,10 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import io.vertx.core.http.HttpServerRequest;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.Processor;
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
-import org.apache.camel.support.processor.DelegateAsyncProcessor;
public final class KnativeHttpSupport {
private KnativeHttpSupport() {
@@ -97,46 +93,25 @@ public final class KnativeHttpSupport {
/**
* Removes cloud event headers at the end of the processing.
*/
- public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) {
- return new DelegateAsyncProcessor(delegate) {
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- return processor.process(exchange, doneSync -> {
- final Message message = exchange.getMessage();
-
- // remove CloudEvent headers
- for (CloudEvent.Attribute attr : ce.attributes()) {
- message.removeHeader(attr.http());
- }
-
- callback.done(doneSync);
- });
- }
- };
+ public static void removeCloudEventHeaders(CloudEvent ce, Message message) {
+ // remove CloudEvent headers
+ for (CloudEvent.Attribute attr : ce.attributes()) {
+ message.removeHeader(attr.http());
+ message.removeHeader(attr.id());
+ }
}
/**
* Remap camel headers to cloud event http headers.
*/
- public static Processor remapCloudEventHeaders(Processor delegate, CloudEvent ce) {
- return new DelegateAsyncProcessor(delegate) {
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- return processor.process(exchange, doneSync -> {
- final Message message = exchange.getMessage();
-
- // remap CloudEvent camel --> http
- for (CloudEvent.Attribute attr : ce.attributes()) {
- Object value = message.getHeader(attr.id());
- if (value != null) {
- message.setHeader(attr.http(), value);
- }
- }
-
- callback.done(doneSync);
- });
+ public static void remapCloudEventHeaders(CloudEvent ce, Message message) {
+ // remap CloudEvent camel --> http
+ for (CloudEvent.Attribute attr : ce.attributes()) {
+ Object value = message.getHeader(attr.id());
+ if (value != null) {
+ message.setHeader(attr.http(), value);
}
- };
+ }
}
}
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
index af3f7d4..a9b0613 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
@@ -96,13 +96,7 @@ public class KnativeHttpTransport extends ServiceSupport implements CamelContext
@Override
public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
- Processor next = KnativeHttpSupport.remapCloudEventHeaders(processor, config.getCloudEvent());
-
- if (config.isRemoveCloudEventHeadersInReply()) {
- next = KnativeHttpSupport.withoutCloudEventHeaders(next, config.getCloudEvent());
- }
-
- return new KnativeHttpConsumer(config, endpoint, service, this.router, next);
+ return new KnativeHttpConsumer(config, endpoint, service, this.router, processor);
}
}
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
index cf3a507..eecbfe6 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
@@ -33,6 +33,7 @@ import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import org.apache.camel.CamelContext;
import org.apache.camel.component.platform.http.PlatformHttpConstants;
+import org.apache.camel.k.test.AvailablePortFinder;
import org.apache.camel.support.service.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,13 +45,17 @@ public class KnativeHttpServer extends ServiceSupport {
private final String host;
private final int port;
private final String path;
+ private final BlockingQueue<HttpServerRequest> requests;
+ private final Handler<RoutingContext> handler;
private Vertx vertx;
private Router router;
private ExecutorService executor;
private HttpServer server;
- private BlockingQueue<HttpServerRequest> requests;
- private Handler<RoutingContext> handler;
+
+ public KnativeHttpServer(CamelContext context) {
+ this(context, "localhost", AvailablePortFinder.getNextAvailable(), "/", null);
+ }
public KnativeHttpServer(CamelContext context, int port) {
this(context, "localhost", port, "/", null);
@@ -60,10 +65,22 @@ public class KnativeHttpServer extends ServiceSupport {
this(context, "localhost", port, "/", handler);
}
+ public KnativeHttpServer(CamelContext context, Handler<RoutingContext> handler) {
+ this(context, "localhost", AvailablePortFinder.getNextAvailable(), "/", handler);
+ }
+
public KnativeHttpServer(CamelContext context, String host, int port, String path) {
this(context, host, port, path, null);
}
+ public KnativeHttpServer(CamelContext context, String host, String path) {
+ this(context, host, AvailablePortFinder.getNextAvailable(), path, null);
+ }
+
+ public KnativeHttpServer(CamelContext context, String host, String path, Handler<RoutingContext> handler) {
+ this(context, host, AvailablePortFinder.getNextAvailable(), path, handler);
+ }
+
public KnativeHttpServer(CamelContext context, String host, int port, String path, Handler<RoutingContext> handler) {
this.context = context;
this.host = host;
@@ -78,6 +95,18 @@ public class KnativeHttpServer extends ServiceSupport {
};
}
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
public HttpServerRequest poll(int timeout, TimeUnit unit) throws InterruptedException {
return requests.poll(timeout, unit);
}
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index caaf91b..63e18cb 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -57,13 +57,13 @@ import static io.restassured.RestAssured.config;
import static io.restassured.RestAssured.given;
import static io.restassured.config.EncoderConfig.encoderConfig;
import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configureKnativeComponent;
+import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.httpAttribute;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.event;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceChannel;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEndpoint;
import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEvent;
-import static org.apache.camel.util.CollectionHelper.mapOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.is;
@@ -73,6 +73,7 @@ public class KnativeHttpTest {
private CamelContext context;
private ProducerTemplate template;
private int platformHttpPort;
+ private String platformHttpHost;
// **************************
//
@@ -84,6 +85,7 @@ public class KnativeHttpTest {
public void before() {
this.context = new DefaultCamelContext();
this.template = this.context.createProducerTemplate();
+ this.platformHttpHost = "localhost";
this.platformHttpPort = AvailablePortFinder.getNextAvailable();
PlatformHttpServiceContextCustomizer httpService = new PlatformHttpServiceContextCustomizer();
@@ -124,8 +126,8 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"myEndpoint",
- mapOf(
- Knative.SERVICE_META_PATH, path,
+ Map.of(
+ Knative.SERVICE_META_PATH, ObjectHelper.supplyIfEmpty(path, () -> "/"),
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -160,11 +162,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post(targetPath)
.then()
@@ -206,9 +208,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"myEndpoint",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.SERVICE_META_PATH, "/a/path",
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
@@ -225,12 +227,12 @@ public class KnativeHttpTest {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint");
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version());
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event");
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "knative://endpoint/myEndpoint");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
- mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http()));
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME)));
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID)));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
@@ -250,7 +252,7 @@ public class KnativeHttpTest {
"myEndpoint",
"none",
-1,
- mapOf(
+ Map.of(
Knative.SERVICE_META_PATH, "/does/not/exist",
Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort),
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
@@ -268,12 +270,12 @@ public class KnativeHttpTest {
context.start();
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint");
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version());
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event");
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "knative://endpoint/myEndpoint");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
- mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
- mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http()));
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME)));
+ mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID)));
mock.expectedBodiesReceived("test");
mock.expectedMessageCount(1);
@@ -291,7 +293,7 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"myEndpoint",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -318,7 +320,7 @@ public class KnativeHttpTest {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
- mapOf(
+ Map.of(
"cloudEventsVersion", ce.version(),
"eventType", "org.apache.camel.event",
"eventID", "myEventID",
@@ -337,7 +339,7 @@ public class KnativeHttpTest {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
- mapOf(
+ Map.of(
"specversion", ce.version(),
"type", "org.apache.camel.event",
"id", "myEventID",
@@ -356,7 +358,7 @@ public class KnativeHttpTest {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
- mapOf(
+ Map.of(
"specversion", ce.version(),
"type", "org.apache.camel.event",
"id", "myEventID",
@@ -375,7 +377,7 @@ public class KnativeHttpTest {
given()
.contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
.body(
- mapOf(
+ Map.of(
"specversion", ce.version(),
"type", "org.apache.camel.event",
"id", "myEventID",
@@ -405,7 +407,7 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"myEndpoint",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -431,11 +433,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
@@ -452,17 +454,17 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"ep1",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
- Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1"
+ Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1"
)),
sourceEndpoint(
"ep2",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
- Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2"
+ Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2"
))
);
@@ -500,11 +502,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1")
.when()
.post()
.then()
@@ -513,11 +515,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2")
.when()
.post()
.then()
@@ -535,17 +537,17 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"ep1",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
- Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[01234]"
+ Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[01234]"
)),
sourceEndpoint(
"ep2",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
- Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[56789]"
+ Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[56789]"
))
);
@@ -583,11 +585,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE0")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE0")
.when()
.post()
.then()
@@ -596,11 +598,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE5")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE5")
.when()
.post()
.then()
@@ -653,11 +655,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event1")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event1")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1")
.when()
.post()
.then()
@@ -666,11 +668,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event2")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event2")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2")
.when()
.post()
.then()
@@ -688,17 +690,17 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"from",
- mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Map.of(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.from",
Knative.CONTENT_TYPE, "text/plain"
)),
endpoint(
Knative.EndpointKind.sink,
"to",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
- Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Map.of(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.to",
Knative.CONTENT_TYPE, "text/plain"
))
);
@@ -707,9 +709,9 @@ public class KnativeHttpTest {
b.from("knative:endpoint/from")
.convertBodyTo(String.class)
.setBody()
- .constant("consumer")
+ .constant("consumer")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
- .constant("custom");
+ .constant("custom");
b.from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
@@ -718,7 +720,7 @@ public class KnativeHttpTest {
MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null);
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event.to");
mock.expectedMessageCount(1);
context.start();
@@ -735,16 +737,16 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"from",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
endpoint(
Knative.EndpointKind.sink,
"to",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -754,9 +756,9 @@ public class KnativeHttpTest {
b.from("knative:endpoint/from?replyWithCloudEvent=true")
.convertBodyTo(String.class)
.setBody()
- .constant("consumer")
+ .constant("consumer")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
- .constant("custom");
+ .constant("custom");
b.from("direct:source")
.to("knative://endpoint/to")
.log("${body}")
@@ -765,7 +767,7 @@ public class KnativeHttpTest {
MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
mock.expectedBodiesReceived("consumer");
- mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom");
+ mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "custom");
mock.expectedMessageCount(1);
context.start();
@@ -785,7 +787,7 @@ public class KnativeHttpTest {
"test",
"",
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
@@ -815,9 +817,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"test",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
@@ -846,7 +848,7 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"ep1",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
@@ -854,7 +856,7 @@ public class KnativeHttpTest {
),
sourceEndpoint(
"ep2",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
@@ -897,7 +899,7 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"ep1",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
@@ -905,7 +907,7 @@ public class KnativeHttpTest {
),
sourceEndpoint(
"ep2",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
@@ -951,9 +953,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"ep",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
@@ -989,15 +991,15 @@ public class KnativeHttpTest {
event(
Knative.EndpointKind.sink,
"default",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
sourceEvent(
"default",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -1035,9 +1037,9 @@ public class KnativeHttpTest {
event(
Knative.EndpointKind.sink,
"default",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_KIND, "MyObject",
@@ -1045,7 +1047,7 @@ public class KnativeHttpTest {
)),
sourceEvent(
"default",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_KIND, "MyOtherObject",
@@ -1084,7 +1086,7 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"myEndpoint",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_KIND, "MyObject",
@@ -1092,7 +1094,7 @@ public class KnativeHttpTest {
)),
sourceEndpoint(
"myEndpoint",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_KIND, "MyObject",
@@ -1120,11 +1122,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
@@ -1141,7 +1143,7 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"myEndpoint",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -1172,9 +1174,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"myEndpoint",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -1196,8 +1198,7 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testNoContent(CloudEvent ce) throws Exception {
- final int wordsPort = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, wordsPort, event -> {
+ final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
event.response().setStatusCode(204);
event.response().end("");
});
@@ -1210,25 +1211,25 @@ public class KnativeHttpTest {
"messages",
null,
-1,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
channel(
Knative.EndpointKind.sink,
"messages",
- "localhost",
+ platformHttpHost,
platformHttpPort,
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)),
channel(
Knative.EndpointKind.sink,
"words",
- "localhost",
- wordsPort,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -1278,11 +1279,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
@@ -1315,11 +1316,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
@@ -1352,11 +1353,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
@@ -1373,7 +1374,7 @@ public class KnativeHttpTest {
.limit(10)
.mapToObj(i -> sourceEndpoint(
"ep-" + i,
- mapOf(Knative.KNATIVE_FILTER_PREFIX + "MyHeader", "channel-" + i)))
+ Map.of(Knative.KNATIVE_FILTER_PREFIX + "MyHeader", "channel-" + i)))
.collect(Collectors.toList());
configureKnativeComponent(context, ce, hops);
@@ -1413,7 +1414,7 @@ public class KnativeHttpTest {
@EnumSource(CloudEvents.class)
void testHeaders(CloudEvent ce) throws Exception {
final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
+ final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
@@ -1421,9 +1422,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"ep",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
@@ -1432,19 +1433,31 @@ public class KnativeHttpTest {
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
- .to("knative:endpoint/ep");
+ .setHeader("CamelDummyHeader").constant("test")
+ .to("knative:endpoint/ep")
+ .to("direct:mock");
+ b.from("direct:mock")
+ .to("mock:ep");
});
context.start();
+
try {
+ MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class);
+ mock.expectedHeaderReceived("CamelDummyHeader", "test");
+ mock.expectedMessageCount(1);
+
server.start();
+
template.sendBody("direct:start", "");
+ mock.assertIsSatisfied();
+
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event");
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("org.apache.camel.event");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1453,12 +1466,54 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
- void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
+ void testHeadersInReply(CloudEvent ce) throws Exception {
final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
- final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+ final KnativeHttpServer server = new KnativeHttpServer(context);
+
+ configureKnativeComponent(
+ context,
+ ce,
+ endpoint(
+ Knative.EndpointKind.sink,
+ "ep",
+ server.getHost(),
+ server.getPort(),
+ Map.of(
+ Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+ Knative.CONTENT_TYPE, "text/plain"
+ )
+ )
+ );
+
+ RouteBuilder.addRoutes(context, b -> {
+ b.from("direct:start")
+ .setHeader("CamelDummyHeader").constant("test")
+ .to("knative:endpoint/ep");
+ });
+
+ context.start();
+
+ try {
+ MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class);
+ mock.expectedHeaderReceived("CamelDummyHeader", "test");
+ mock.expectedMessageCount(1);
+
+ server.start();
+
+ Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(null));
+ assertThat(exchange.getMessage().getHeaders()).containsEntry("CamelDummyHeader", "test");
+ } finally {
+ server.stop();
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(CloudEvents.class)
+ void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
+ final KnativeHttpServer server = new KnativeHttpServer(context);
+ final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
final String typeHeaderVal = UUID.randomUUID().toString();
- final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+ final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
final String sourceHeaderVal = UUID.randomUUID().toString();
configureKnativeComponent(
@@ -1467,9 +1522,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"ep",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain",
Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
@@ -1489,10 +1544,10 @@ public class KnativeHttpTest {
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1502,11 +1557,10 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
- final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+ final KnativeHttpServer server = new KnativeHttpServer(context);
+ final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
final String typeHeaderVal = UUID.randomUUID().toString();
- final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+ final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
final String sourceHeaderVal = UUID.randomUUID().toString();
configureKnativeComponent(
@@ -1515,9 +1569,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"ep",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
@@ -1537,10 +1591,10 @@ public class KnativeHttpTest {
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1550,11 +1604,10 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
- final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+ final KnativeHttpServer server = new KnativeHttpServer(context);
+ final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
final String typeHeaderVal = UUID.randomUUID().toString();
- final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+ final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
final String sourceHeaderVal = UUID.randomUUID().toString();
KnativeComponent component = configureKnativeComponent(
@@ -1563,16 +1616,16 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"ep",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);
- component.getConfiguration().setCeOverride(mapOf(
+ component.getConfiguration().setCeOverride(Map.of(
Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
));
@@ -1588,10 +1641,10 @@ public class KnativeHttpTest {
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1601,8 +1654,7 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
+ final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
@@ -1610,9 +1662,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"ep",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
@@ -1631,10 +1683,10 @@ public class KnativeHttpTest {
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType");
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("myType");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1644,8 +1696,7 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
+ final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
@@ -1653,9 +1704,9 @@ public class KnativeHttpTest {
endpoint(
Knative.EndpointKind.sink,
"ep",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
@@ -1664,7 +1715,7 @@ public class KnativeHttpTest {
RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
- .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader")
+ .setHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE)).constant("fromCEHeader")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("fromCamelHeader")
.to("knative:endpoint/ep");
});
@@ -1675,10 +1726,10 @@ public class KnativeHttpTest {
template.sendBody("direct:start", "");
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader");
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("fromCEHeader");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1688,8 +1739,7 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testEventBridge(CloudEvent ce) throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
+ final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
@@ -1697,14 +1747,14 @@ public class KnativeHttpTest {
event(
Knative.EndpointKind.sink,
"event.sink",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.CONTENT_TYPE, "text/plain"
)),
sourceEvent(
"event.source",
- mapOf(
+ Map.of(
Knative.CONTENT_TYPE, "text/plain"
))
);
@@ -1722,19 +1772,19 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(204);
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1745,7 +1795,7 @@ public class KnativeHttpTest {
@EnumSource(CloudEvents.class)
void testDynamicEventBridge(CloudEvent ce) throws Exception {
final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port);
+ final KnativeHttpServer server = new KnativeHttpServer(context);
configureKnativeComponent(
context,
@@ -1753,14 +1803,14 @@ public class KnativeHttpTest {
event(
Knative.EndpointKind.sink,
"default",
- "localhost",
- port,
- mapOf(
+ server.getHost(),
+ server.getPort(),
+ Map.of(
Knative.CONTENT_TYPE, "text/plain"
)),
sourceEvent(
"event.source",
- mapOf(
+ Map.of(
Knative.CONTENT_TYPE, "text/plain"
))
);
@@ -1779,19 +1829,19 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
.statusCode(204);
HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
- assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink");
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+ assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink");
assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
@@ -1801,8 +1851,7 @@ public class KnativeHttpTest {
@ParameterizedTest
@EnumSource(CloudEvents.class)
void testSlowConsumer(CloudEvent ce) throws Exception {
- final int port = AvailablePortFinder.getNextAvailable();
- final KnativeHttpServer server = new KnativeHttpServer(context, port, event -> {
+ final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
event.vertx().executeBlocking(
promise -> {
try {
@@ -1825,7 +1874,7 @@ public class KnativeHttpTest {
ce,
sourceEndpoint(
"start",
- mapOf(
+ Map.of(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
))
@@ -1837,7 +1886,7 @@ public class KnativeHttpTest {
RouteBuilder.addRoutes(context, b -> {
b.from("knative:endpoint/start")
.removeHeaders("Camel*")
- .toF("http://localhost:%d", port);
+ .toF("http://%s:%d", server.getHost(), server.getPort());
});
context.start();
@@ -1845,11 +1894,11 @@ public class KnativeHttpTest {
given()
.body("test")
.header(Exchange.CONTENT_TYPE, "text/plain")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
- .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+ .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
.when()
.post()
.then()
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
index 8bf3702..bbe209d 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
@@ -39,4 +39,8 @@ public final class KnativeHttpTestSupport {
return component;
}
+
+ public static String httpAttribute(CloudEvent ce, String name) {
+ return ce.mandatoryAttribute(name).http();
+ }
}
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java
new file mode 100644
index 0000000..19250b4
--- /dev/null
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http.assertions;
+
+import java.util.Objects;
+
+import io.vertx.core.http.HttpServerRequest;
+import org.assertj.core.api.AbstractAssert;
+import org.assertj.core.api.AbstractStringAssert;
+import org.assertj.core.api.AssertionsForClassTypes;
+
+public class HttpServerRequestAssert extends AbstractAssert<HttpServerRequestAssert, HttpServerRequest> {
+ public HttpServerRequestAssert(HttpServerRequest request) {
+ super(request, HttpServerRequest.class);
+ }
+
+ public static HttpServerRequestAssert assertThat(HttpServerRequest actual) {
+ return new HttpServerRequestAssert(actual);
+ }
+
+
+ public AbstractStringAssert<?> header(String name) {
+ isNotNull();
+
+ return AssertionsForClassTypes.assertThat(actual.getHeader(name));
+ }
+
+ public HttpServerRequestAssert hasHeader(String name) {
+ isNotNull();
+
+ if (Objects.isNull(actual.getHeader(name))) {
+ //failWithMessage("Expected header name to be <%s> but was <%s>", name, actual.getName());
+ failWithMessage("Expected header %s not present", name);
+ }
+
+ return this;
+ }
+
+ public HttpServerRequestAssert hasHeader(String name, String value) {
+ isNotNull();
+
+ if (Objects.isNull(actual.getHeader(name))) {
+ failWithMessage("Expected header %s not present", name);
+ }
+
+ if (Objects.equals(actual.getHeader(name), value)) {
+ failWithMessage("Expected header %s to be <%s> but was <%s>", name, value, actual.getHeader(name));
+ }
+
+ return this;
+ }
+}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index b38f510..0e2b235 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -128,7 +128,7 @@ public class KnativeConfiguration implements Cloneable {
* Set the transport options.
*/
public void setTransportOptions(Map<String, Object> transportOptions) {
- this.transportOptions = transportOptions;
+ this.transportOptions = new HashMap<>(transportOptions);
}
/**
@@ -150,7 +150,7 @@ public class KnativeConfiguration implements Cloneable {
* Set the filters.
*/
public void setFilters(Map<String, Object> filters) {
- this.filters = filters;
+ this.filters = new HashMap<>(filters);
}
public Map<String, Object> getCeOverride() {
@@ -161,7 +161,7 @@ public class KnativeConfiguration implements Cloneable {
* CloudEvent headers to override
*/
public void setCeOverride(Map<String, Object> ceOverride) {
- this.ceOverride = ceOverride;
+ this.ceOverride = new HashMap<>(ceOverride);
}
public String getApiVersion() {