You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2022/04/28 10:05:59 UTC
[camel-quarkus] 05/06: Fix MockEndpoint usage in gRPC tests
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch 2.7.x
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit f84498aa94f879127cef878d5713d759ee185d5d
Author: James Netherton <ja...@gmail.com>
AuthorDate: Wed Apr 27 07:58:01 2022 +0100
Fix MockEndpoint usage in gRPC tests
---
integration-tests/grpc/pom.xml | 8 ++
.../quarkus/component/grpc/it/GrpcResource.java | 124 +++++++++++----------
.../camel/quarkus/component/grpc/it/GrpcRoute.java | 16 ++-
.../component/grpc/it/MessageOriginProcessor.java | 38 +++++++
.../camel/quarkus/component/grpc/it/GrpcTest.java | 91 ++++++++++++---
5 files changed, 202 insertions(+), 75 deletions(-)
diff --git a/integration-tests/grpc/pom.xml b/integration-tests/grpc/pom.xml
index 4cf066b02c..87885a21ab 100644
--- a/integration-tests/grpc/pom.xml
+++ b/integration-tests/grpc/pom.xml
@@ -55,6 +55,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-jsonb</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
@@ -78,6 +82,10 @@
<artifactId>camel-quarkus-integration-test-support</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
index 3b14996fb8..21f91258f9 100644
--- a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
+++ b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.quarkus.component.grpc.it;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@@ -35,11 +37,6 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_HEADER;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
import static org.apache.camel.quarkus.component.grpc.it.GrpcRoute.PING_PONG_SERVICE;
@Path("/grpc")
@@ -69,96 +66,107 @@ public class GrpcResource {
@Path("/forwardOnCompleted")
@GET
- public void forwardOnCompleted() throws InterruptedException {
- MockEndpoint endpoint = context.getEndpoint("mock:forwardOnCompleted", MockEndpoint.class);
- endpoint.expectedMessageCount(1);
- endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_COMPLETED);
- endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncAsync");
- endpoint.assertIsSatisfied(5000L);
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Object> forwardOnCompleted() throws InterruptedException {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:forwardOnCompleted", MockEndpoint.class);
+ List<Exchange> exchanges = mockEndpoint.getExchanges();
+ if (!exchanges.isEmpty()) {
+ Exchange exchange = exchanges.get(0);
+ return exchange.getMessage().getHeaders();
+ }
+ return Collections.emptyMap();
}
@Path("/forwardOnError")
@GET
- public String forwardOnError() throws InterruptedException {
- MockEndpoint endpoint = context.getEndpoint("mock:forwardOnError", MockEndpoint.class);
- endpoint.expectedMessageCount(1);
- endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_ERROR);
- endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncAsync");
- endpoint.assertIsSatisfied(5000L);
-
- List<Exchange> exchanges = endpoint.getExchanges();
- Exchange exchange = exchanges.get(0);
- Throwable throwable = exchange.getMessage().getBody(Throwable.class);
- return throwable.getClass().getName();
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Object> forwardOnError() throws InterruptedException {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:forwardOnError", MockEndpoint.class);
+ List<Exchange> exchanges = mockEndpoint.getExchanges();
+ if (!exchanges.isEmpty()) {
+ Exchange exchange = exchanges.get(0);
+ Throwable throwable = exchange.getMessage().getBody(Throwable.class);
+ Map<String, Object> results = exchange.getMessage().getHeaders();
+ results.put("error", throwable.getClass().getName());
+ return results;
+ }
+ return Collections.emptyMap();
}
@Path("/grpcStreamReplies")
@GET
public void grpcStreamReplies() throws InterruptedException {
int messageCount = 10;
+ MockEndpoint endpoint = context.getEndpoint("mock:grpcStreamReplies", MockEndpoint.class);
+ endpoint.expectedMessageCount(messageCount);
+
for (int i = 1; i <= messageCount; i++) {
PingRequest request = PingRequest.newBuilder().setPingName(String.valueOf(i)).build();
producerTemplate.sendBody("direct:grpcStream", request);
}
- MockEndpoint endpoint = context.getEndpoint("mock:grpcStreamReplies", MockEndpoint.class);
- endpoint.expectedMessageCount(messageCount);
endpoint.assertIsSatisfied();
}
@Path("/tls")
@GET
- public void tlsConsumer() throws InterruptedException {
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Object> tlsConsumer() throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:tls", MockEndpoint.class);
- mockEndpoint.expectedMessageCount(1);
- mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_NEXT);
- mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncSync");
- mockEndpoint.assertIsSatisfied();
+ List<Exchange> exchanges = mockEndpoint.getExchanges();
+ if (!exchanges.isEmpty()) {
+ Exchange exchange = exchanges.get(0);
+ return exchange.getMessage().getHeaders();
+ }
+ return Collections.emptyMap();
}
@Path("/tls")
@POST
@Produces(MediaType.TEXT_PLAIN)
public String tlsProducer(String message) {
- MockEndpoint mockEndpoint = context.getEndpoint("mock:tls", MockEndpoint.class);
- try {
- PingRequest pingRequest = PingRequest.newBuilder()
- .setPingName(message)
- .setPingId(12345)
- .build();
-
- PongResponse response = producerTemplate.requestBody("direct:sendTls", pingRequest, PongResponse.class);
- return response.getPongName();
- } finally {
- mockEndpoint.reset();
- }
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(message)
+ .setPingId(12345)
+ .build();
+
+ PongResponse response = producerTemplate.requestBodyAndHeader(
+ "direct:sendTls",
+ pingRequest,
+ "origin",
+ "producer",
+ PongResponse.class);
+ return response.getPongName();
}
@Path("/jwt")
@GET
- public void jwtConsumer() throws InterruptedException {
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Object> jwtConsumer() throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt", MockEndpoint.class);
- mockEndpoint.expectedMessageCount(1);
- mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_NEXT);
- mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncSync");
- mockEndpoint.assertIsSatisfied();
+ List<Exchange> exchanges = mockEndpoint.getExchanges();
+ if (!exchanges.isEmpty()) {
+ Exchange exchange = exchanges.get(0);
+ return exchange.getMessage().getHeaders();
+ }
+ return Collections.emptyMap();
}
@Path("/jwt")
@POST
@Produces(MediaType.TEXT_PLAIN)
public String jwtProducer(String message) {
- MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt", MockEndpoint.class);
- try {
- PingRequest pingRequest = PingRequest.newBuilder()
- .setPingName(message)
- .setPingId(12345)
- .build();
-
- PongResponse response = producerTemplate.requestBody("direct:sendJwt", pingRequest, PongResponse.class);
- return response.getPongName();
- } finally {
- mockEndpoint.reset();
- }
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(message)
+ .setPingId(12345)
+ .build();
+
+ PongResponse response = producerTemplate.requestBodyAndHeader(
+ "direct:sendJwt",
+ pingRequest,
+ "origin",
+ "producer",
+ PongResponse.class);
+ return response.getPongName();
}
}
diff --git a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
index c9b083813a..ace3285a62 100644
--- a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
+++ b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
@@ -79,8 +79,14 @@ public class GrpcRoute extends RouteBuilder {
+ "/%s?consumerStrategy=PROPAGATION&"
+ "negotiationType=TLS&keyCertChainResource=certs/server.pem&"
+ "keyResource=certs/server.key&trustCertCollectionResource=certs/ca.pem", PING_PONG_SERVICE)
+ .process("messageOriginProcessor")
+ .choice()
+ .when(simple("${header.origin} == 'producer'"))
+ .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse")
+ .endChoice()
+ .otherwise()
.to("mock:tls")
- .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
+ .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse").endChoice();
from("direct:sendTls")
.toF("grpc://localhost:{{camel.grpc.test.tls.server.port}}"
@@ -91,8 +97,14 @@ public class GrpcRoute extends RouteBuilder {
fromF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
+ "/%s?consumerStrategy=PROPAGATION&"
+ "authenticationType=JWT&jwtSecret=%s", PING_PONG_SERVICE, GRPC_JWT_SECRET)
+ .process("messageOriginProcessor")
+ .choice()
+ .when(simple("${header.origin} == 'producer'"))
+ .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse")
+ .endChoice()
+ .otherwise()
.to("mock:jwt")
- .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
+ .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse").endChoice();
from("direct:sendJwt")
.toF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
diff --git a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/MessageOriginProcessor.java b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/MessageOriginProcessor.java
new file mode 100644
index 0000000000..47832337b5
--- /dev/null
+++ b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/MessageOriginProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.grpc.it;
+
+import javax.inject.Named;
+import javax.inject.Singleton;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
+
+@Singleton
+@Named
+public class MessageOriginProcessor implements Processor {
+ @Override
+ public void process(Exchange exchange) {
+ Message message = exchange.getMessage();
+ PingRequest request = message.getBody(PingRequest.class);
+ if (request.getPingName().endsWith("producer")) {
+ message.setHeader("origin", "producer");
+ }
+ }
+}
diff --git a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
index 6498810bba..eaae495205 100644
--- a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
+++ b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
@@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
+import io.restassured.path.json.JsonPath;
import org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm;
import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
@@ -35,12 +36,18 @@ import org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc;
import org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc.PingPongBlockingStub;
import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
+import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_HEADER;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
import static org.apache.camel.quarkus.component.grpc.it.GrpcRoute.GRPC_JWT_SECRET;
import static org.apache.camel.quarkus.component.grpc.it.PingPongImpl.GRPC_TEST_PONG_VALUE;
import static org.hamcrest.Matchers.equalTo;
@@ -109,9 +116,22 @@ class GrpcTest {
latch.await(5, TimeUnit.SECONDS);
- RestAssured.get("/grpc/forwardOnCompleted")
- .then()
- .statusCode(204);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ JsonPath json = RestAssured.get("/grpc/forwardOnCompleted")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .jsonPath();
+
+ String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+ String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+
+ return eventType != null
+ && eventType.equals(GRPC_EVENT_TYPE_ON_COMPLETED)
+ && methodName != null
+ && methodName.equals("pingAsyncAsync");
+ });
} finally {
channel.shutdownNow();
}
@@ -133,10 +153,25 @@ class GrpcTest {
latch.await(5, TimeUnit.SECONDS);
- RestAssured.get("/grpc/forwardOnError")
- .then()
- .statusCode(200)
- .body(is(StatusRuntimeException.class.getName()));
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ JsonPath json = RestAssured.get("/grpc/forwardOnError")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .jsonPath();
+
+ String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+ String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+ String error = json.getString("error");
+
+ return error != null
+ && error.equals(StatusRuntimeException.class.getName())
+ && eventType != null
+ && eventType.equals(GRPC_EVENT_TYPE_ON_ERROR)
+ && methodName != null
+ && methodName.equals("pingAsyncAsync");
+ });
} finally {
channel.shutdownNow();
}
@@ -205,9 +240,22 @@ class GrpcTest {
requestObserver.onNext(pingRequest);
latch.await(5, TimeUnit.SECONDS);
- RestAssured.get("/grpc/tls")
- .then()
- .statusCode(204);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ JsonPath json = RestAssured.get("/grpc/tls")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .jsonPath();
+
+ String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+ String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+
+ return eventType != null
+ && eventType.equals(GRPC_EVENT_TYPE_ON_NEXT)
+ && methodName != null
+ && methodName.equals("pingAsyncSync");
+ });
PongResponse pongResponse = responseObserver.getPongResponse();
assertNotNull(pongResponse);
@@ -222,7 +270,7 @@ class GrpcTest {
@Test
public void tlsProducer() {
- String message = GRPC_TEST_PING_VALUE + " TLS";
+ String message = GRPC_TEST_PING_VALUE + " TLS producer";
RestAssured.given()
.body(message)
.post("/grpc/tls")
@@ -255,9 +303,22 @@ class GrpcTest {
requestObserver.onNext(pingRequest);
latch.await(5, TimeUnit.SECONDS);
- RestAssured.get("/grpc/jwt")
- .then()
- .statusCode(204);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ JsonPath json = RestAssured.get("/grpc/jwt")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .jsonPath();
+
+ String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+ String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+
+ return eventType != null
+ && eventType.equals(GRPC_EVENT_TYPE_ON_NEXT)
+ && methodName != null
+ && methodName.equals("pingAsyncSync");
+ });
PongResponse pongResponse = responseObserver.getPongResponse();
assertNotNull(pongResponse);
@@ -272,7 +333,7 @@ class GrpcTest {
@Test
public void jwtProducer() {
- String message = GRPC_TEST_PING_VALUE + " JWT";
+ String message = GRPC_TEST_PING_VALUE + " JWT producer";
RestAssured.given()
.body(message)
.post("/grpc/jwt")