You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2022/04/28 10:05:59 UTC

[camel-quarkus] 05/06: Fix MockEndpoint usage in gRPC tests

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch 2.7.x
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit f84498aa94f879127cef878d5713d759ee185d5d
Author: James Netherton <ja...@gmail.com>
AuthorDate: Wed Apr 27 07:58:01 2022 +0100

    Fix MockEndpoint usage in gRPC tests
---
 integration-tests/grpc/pom.xml                     |   8 ++
 .../quarkus/component/grpc/it/GrpcResource.java    | 124 +++++++++++----------
 .../camel/quarkus/component/grpc/it/GrpcRoute.java |  16 ++-
 .../component/grpc/it/MessageOriginProcessor.java  |  38 +++++++
 .../camel/quarkus/component/grpc/it/GrpcTest.java  |  91 ++++++++++++---
 5 files changed, 202 insertions(+), 75 deletions(-)

diff --git a/integration-tests/grpc/pom.xml b/integration-tests/grpc/pom.xml
index 4cf066b02c..87885a21ab 100644
--- a/integration-tests/grpc/pom.xml
+++ b/integration-tests/grpc/pom.xml
@@ -55,6 +55,10 @@
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-resteasy</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-resteasy-jsonb</artifactId>
+        </dependency>
 
         <!-- test dependencies -->
         <dependency>
@@ -78,6 +82,10 @@
             <artifactId>camel-quarkus-integration-test-support</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
index 3b14996fb8..21f91258f9 100644
--- a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
+++ b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.quarkus.component.grpc.it;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
@@ -35,11 +37,6 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
 import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
 
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_HEADER;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
-import static org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
 import static org.apache.camel.quarkus.component.grpc.it.GrpcRoute.PING_PONG_SERVICE;
 
 @Path("/grpc")
@@ -69,96 +66,107 @@ public class GrpcResource {
 
     @Path("/forwardOnCompleted")
     @GET
-    public void forwardOnCompleted() throws InterruptedException {
-        MockEndpoint endpoint = context.getEndpoint("mock:forwardOnCompleted", MockEndpoint.class);
-        endpoint.expectedMessageCount(1);
-        endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_COMPLETED);
-        endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncAsync");
-        endpoint.assertIsSatisfied(5000L);
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, Object> forwardOnCompleted() throws InterruptedException {
+        MockEndpoint mockEndpoint = context.getEndpoint("mock:forwardOnCompleted", MockEndpoint.class);
+        List<Exchange> exchanges = mockEndpoint.getExchanges();
+        if (!exchanges.isEmpty()) {
+            Exchange exchange = exchanges.get(0);
+            return exchange.getMessage().getHeaders();
+        }
+        return Collections.emptyMap();
     }
 
     @Path("/forwardOnError")
     @GET
-    public String forwardOnError() throws InterruptedException {
-        MockEndpoint endpoint = context.getEndpoint("mock:forwardOnError", MockEndpoint.class);
-        endpoint.expectedMessageCount(1);
-        endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_ERROR);
-        endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncAsync");
-        endpoint.assertIsSatisfied(5000L);
-
-        List<Exchange> exchanges = endpoint.getExchanges();
-        Exchange exchange = exchanges.get(0);
-        Throwable throwable = exchange.getMessage().getBody(Throwable.class);
-        return throwable.getClass().getName();
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, Object> forwardOnError() throws InterruptedException {
+        MockEndpoint mockEndpoint = context.getEndpoint("mock:forwardOnError", MockEndpoint.class);
+        List<Exchange> exchanges = mockEndpoint.getExchanges();
+        if (!exchanges.isEmpty()) {
+            Exchange exchange = exchanges.get(0);
+            Throwable throwable = exchange.getMessage().getBody(Throwable.class);
+            Map<String, Object> results = exchange.getMessage().getHeaders();
+            results.put("error", throwable.getClass().getName());
+            return results;
+        }
+        return Collections.emptyMap();
     }
 
     @Path("/grpcStreamReplies")
     @GET
     public void grpcStreamReplies() throws InterruptedException {
         int messageCount = 10;
+        MockEndpoint endpoint = context.getEndpoint("mock:grpcStreamReplies", MockEndpoint.class);
+        endpoint.expectedMessageCount(messageCount);
+
         for (int i = 1; i <= messageCount; i++) {
             PingRequest request = PingRequest.newBuilder().setPingName(String.valueOf(i)).build();
             producerTemplate.sendBody("direct:grpcStream", request);
         }
 
-        MockEndpoint endpoint = context.getEndpoint("mock:grpcStreamReplies", MockEndpoint.class);
-        endpoint.expectedMessageCount(messageCount);
         endpoint.assertIsSatisfied();
     }
 
     @Path("/tls")
     @GET
-    public void tlsConsumer() throws InterruptedException {
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, Object> tlsConsumer() throws InterruptedException {
         MockEndpoint mockEndpoint = context.getEndpoint("mock:tls", MockEndpoint.class);
-        mockEndpoint.expectedMessageCount(1);
-        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_NEXT);
-        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncSync");
-        mockEndpoint.assertIsSatisfied();
+        List<Exchange> exchanges = mockEndpoint.getExchanges();
+        if (!exchanges.isEmpty()) {
+            Exchange exchange = exchanges.get(0);
+            return exchange.getMessage().getHeaders();
+        }
+        return Collections.emptyMap();
     }
 
     @Path("/tls")
     @POST
     @Produces(MediaType.TEXT_PLAIN)
     public String tlsProducer(String message) {
-        MockEndpoint mockEndpoint = context.getEndpoint("mock:tls", MockEndpoint.class);
-        try {
-            PingRequest pingRequest = PingRequest.newBuilder()
-                    .setPingName(message)
-                    .setPingId(12345)
-                    .build();
-
-            PongResponse response = producerTemplate.requestBody("direct:sendTls", pingRequest, PongResponse.class);
-            return response.getPongName();
-        } finally {
-            mockEndpoint.reset();
-        }
+        PingRequest pingRequest = PingRequest.newBuilder()
+                .setPingName(message)
+                .setPingId(12345)
+                .build();
+
+        PongResponse response = producerTemplate.requestBodyAndHeader(
+                "direct:sendTls",
+                pingRequest,
+                "origin",
+                "producer",
+                PongResponse.class);
+        return response.getPongName();
     }
 
     @Path("/jwt")
     @GET
-    public void jwtConsumer() throws InterruptedException {
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, Object> jwtConsumer() throws InterruptedException {
         MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt", MockEndpoint.class);
-        mockEndpoint.expectedMessageCount(1);
-        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER, GRPC_EVENT_TYPE_ON_NEXT);
-        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER, "pingAsyncSync");
-        mockEndpoint.assertIsSatisfied();
+        List<Exchange> exchanges = mockEndpoint.getExchanges();
+        if (!exchanges.isEmpty()) {
+            Exchange exchange = exchanges.get(0);
+            return exchange.getMessage().getHeaders();
+        }
+        return Collections.emptyMap();
     }
 
     @Path("/jwt")
     @POST
     @Produces(MediaType.TEXT_PLAIN)
     public String jwtProducer(String message) {
-        MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt", MockEndpoint.class);
-        try {
-            PingRequest pingRequest = PingRequest.newBuilder()
-                    .setPingName(message)
-                    .setPingId(12345)
-                    .build();
-
-            PongResponse response = producerTemplate.requestBody("direct:sendJwt", pingRequest, PongResponse.class);
-            return response.getPongName();
-        } finally {
-            mockEndpoint.reset();
-        }
+        PingRequest pingRequest = PingRequest.newBuilder()
+                .setPingName(message)
+                .setPingId(12345)
+                .build();
+
+        PongResponse response = producerTemplate.requestBodyAndHeader(
+                "direct:sendJwt",
+                pingRequest,
+                "origin",
+                "producer",
+                PongResponse.class);
+        return response.getPongName();
     }
 }
diff --git a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
index c9b083813a..ace3285a62 100644
--- a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
+++ b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
@@ -79,8 +79,14 @@ public class GrpcRoute extends RouteBuilder {
                 + "/%s?consumerStrategy=PROPAGATION&"
                 + "negotiationType=TLS&keyCertChainResource=certs/server.pem&"
                 + "keyResource=certs/server.key&trustCertCollectionResource=certs/ca.pem", PING_PONG_SERVICE)
+                        .process("messageOriginProcessor")
+                        .choice()
+                        .when(simple("${header.origin} == 'producer'"))
+                        .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse")
+                        .endChoice()
+                        .otherwise()
                         .to("mock:tls")
-                        .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
+                        .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse").endChoice();
 
         from("direct:sendTls")
                 .toF("grpc://localhost:{{camel.grpc.test.tls.server.port}}"
@@ -91,8 +97,14 @@ public class GrpcRoute extends RouteBuilder {
         fromF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
                 + "/%s?consumerStrategy=PROPAGATION&"
                 + "authenticationType=JWT&jwtSecret=%s", PING_PONG_SERVICE, GRPC_JWT_SECRET)
+                        .process("messageOriginProcessor")
+                        .choice()
+                        .when(simple("${header.origin} == 'producer'"))
+                        .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse")
+                        .endChoice()
+                        .otherwise()
                         .to("mock:jwt")
-                        .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
+                        .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse").endChoice();
 
         from("direct:sendJwt")
                 .toF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
diff --git a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/MessageOriginProcessor.java b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/MessageOriginProcessor.java
new file mode 100644
index 0000000000..47832337b5
--- /dev/null
+++ b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/MessageOriginProcessor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.grpc.it;
+
+import javax.inject.Named;
+import javax.inject.Singleton;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
+
+@Singleton
+@Named
+public class MessageOriginProcessor implements Processor {
+    @Override
+    public void process(Exchange exchange) {
+        Message message = exchange.getMessage();
+        PingRequest request = message.getBody(PingRequest.class);
+        if (request.getPingName().endsWith("producer")) {
+            message.setHeader("origin", "producer");
+        }
+    }
+}
diff --git a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
index 6498810bba..eaae495205 100644
--- a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
+++ b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
@@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver;
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
+import io.restassured.path.json.JsonPath;
 import org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm;
 import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
 import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
@@ -35,12 +36,18 @@ import org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc;
 import org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc.PingPongBlockingStub;
 import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
 import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
+import org.awaitility.Awaitility;
 import org.eclipse.microprofile.config.Config;
 import org.eclipse.microprofile.config.ConfigProvider;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_HEADER;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
+import static org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
 import static org.apache.camel.quarkus.component.grpc.it.GrpcRoute.GRPC_JWT_SECRET;
 import static org.apache.camel.quarkus.component.grpc.it.PingPongImpl.GRPC_TEST_PONG_VALUE;
 import static org.hamcrest.Matchers.equalTo;
@@ -109,9 +116,22 @@ class GrpcTest {
 
             latch.await(5, TimeUnit.SECONDS);
 
-            RestAssured.get("/grpc/forwardOnCompleted")
-                    .then()
-                    .statusCode(204);
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+                JsonPath json = RestAssured.get("/grpc/forwardOnCompleted")
+                        .then()
+                        .statusCode(200)
+                        .extract()
+                        .body()
+                        .jsonPath();
+
+                String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+                String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+
+                return eventType != null
+                        && eventType.equals(GRPC_EVENT_TYPE_ON_COMPLETED)
+                        && methodName != null
+                        && methodName.equals("pingAsyncAsync");
+            });
         } finally {
             channel.shutdownNow();
         }
@@ -133,10 +153,25 @@ class GrpcTest {
 
             latch.await(5, TimeUnit.SECONDS);
 
-            RestAssured.get("/grpc/forwardOnError")
-                    .then()
-                    .statusCode(200)
-                    .body(is(StatusRuntimeException.class.getName()));
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+                JsonPath json = RestAssured.get("/grpc/forwardOnError")
+                        .then()
+                        .statusCode(200)
+                        .extract()
+                        .body()
+                        .jsonPath();
+
+                String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+                String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+                String error = json.getString("error");
+
+                return error != null
+                        && error.equals(StatusRuntimeException.class.getName())
+                        && eventType != null
+                        && eventType.equals(GRPC_EVENT_TYPE_ON_ERROR)
+                        && methodName != null
+                        && methodName.equals("pingAsyncAsync");
+            });
         } finally {
             channel.shutdownNow();
         }
@@ -205,9 +240,22 @@ class GrpcTest {
             requestObserver.onNext(pingRequest);
             latch.await(5, TimeUnit.SECONDS);
 
-            RestAssured.get("/grpc/tls")
-                    .then()
-                    .statusCode(204);
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+                JsonPath json = RestAssured.get("/grpc/tls")
+                        .then()
+                        .statusCode(200)
+                        .extract()
+                        .body()
+                        .jsonPath();
+
+                String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+                String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+
+                return eventType != null
+                        && eventType.equals(GRPC_EVENT_TYPE_ON_NEXT)
+                        && methodName != null
+                        && methodName.equals("pingAsyncSync");
+            });
 
             PongResponse pongResponse = responseObserver.getPongResponse();
             assertNotNull(pongResponse);
@@ -222,7 +270,7 @@ class GrpcTest {
 
     @Test
     public void tlsProducer() {
-        String message = GRPC_TEST_PING_VALUE + " TLS";
+        String message = GRPC_TEST_PING_VALUE + " TLS producer";
         RestAssured.given()
                 .body(message)
                 .post("/grpc/tls")
@@ -255,9 +303,22 @@ class GrpcTest {
             requestObserver.onNext(pingRequest);
             latch.await(5, TimeUnit.SECONDS);
 
-            RestAssured.get("/grpc/jwt")
-                    .then()
-                    .statusCode(204);
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+                JsonPath json = RestAssured.get("/grpc/jwt")
+                        .then()
+                        .statusCode(200)
+                        .extract()
+                        .body()
+                        .jsonPath();
+
+                String eventType = json.getString(GRPC_EVENT_TYPE_HEADER);
+                String methodName = json.getString(GRPC_METHOD_NAME_HEADER);
+
+                return eventType != null
+                        && eventType.equals(GRPC_EVENT_TYPE_ON_NEXT)
+                        && methodName != null
+                        && methodName.equals("pingAsyncSync");
+            });
 
             PongResponse pongResponse = responseObserver.getPongResponse();
             assertNotNull(pongResponse);
@@ -272,7 +333,7 @@ class GrpcTest {
 
     @Test
     public void jwtProducer() {
-        String message = GRPC_TEST_PING_VALUE + " JWT";
+        String message = GRPC_TEST_PING_VALUE + " JWT producer";
         RestAssured.given()
                 .body(message)
                 .post("/grpc/jwt")