You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pp...@apache.org on 2021/07/29 06:08:42 UTC

[camel-quarkus] 01/10: Test rollback() EIP #2628

This is an automated email from the ASF dual-hosted git repository.

ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit c1e13bad0419003f876d39fa412c973a8902831d
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Fri Jul 23 18:57:11 2021 +0200

    Test rollback() EIP #2628
---
 .../quarkus/component/jta/it/JtaResource.java      | 70 +++++++---------------
 .../camel/quarkus/component/jta/it/JtaRoutes.java  | 45 ++++++++++++--
 .../camel/quarkus/component/jta/it/JtaTest.java    | 54 +++++++++++------
 3 files changed, 96 insertions(+), 73 deletions(-)

diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
index 588f8d9..c67c9b9 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
@@ -20,10 +20,10 @@ import java.net.URI;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.List;
+import java.util.stream.Collectors;
 
-import javax.annotation.PostConstruct;
 import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
 import javax.inject.Inject;
 import javax.transaction.Transactional;
 import javax.ws.rs.Consumes;
@@ -38,10 +38,9 @@ import javax.ws.rs.core.Response;
 import io.agroal.api.AgroalDataSource;
 import io.quarkus.agroal.DataSource;
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.quarkus.main.events.AfterStart;
 import org.jboss.logging.Logger;
 
 @Path("/jta")
@@ -59,16 +58,16 @@ public class JtaResource {
     @Inject
     CamelContext context;
 
-    @PostConstruct
-    void postConstruct() throws SQLException {
+    void postConstruct(@Observes AfterStart event) throws SQLException {
         try (Connection conn = dataSource.getConnection()) {
             try (Statement statement = conn.createStatement()) {
+                LOG.info("Recreating table 'example'");
                 try {
                     statement.execute("drop table example");
                 } catch (Exception ignored) {
                 }
                 statement.execute(
-                        "create table example (id serial primary key, message varchar(255) not null, origin varchar(5) not null)");
+                        "create table example (id serial primary key, message varchar(255) not null, origin varchar(255) not null)");
             }
         }
     }
@@ -96,60 +95,31 @@ public class JtaResource {
         return post(policy, message);
     }
 
-    @Path("/jdbc")
+    @Path("/route/{route}")
     @POST
     @Consumes(MediaType.TEXT_PLAIN)
     @Produces(MediaType.TEXT_PLAIN)
-    public Response jdbc(String message) throws Exception {
-        String response = request("direct:jdbc", message);
-        LOG.infof("Got response from jdbc: %s", response);
+    public Response route(@PathParam("route") String route, String message) throws Exception {
+        LOG.infof("message is %s", message);
+        String response = producerTemplate.requestBody("direct:" + route, message, String.class);
+        LOG.infof("Got response from %s: %s", route, response);
         return Response
                 .created(new URI("https://camel.apache.org/"))
                 .entity(response)
                 .build();
     }
 
-    @Path("/sqltx")
-    @POST
-    @Consumes(MediaType.TEXT_PLAIN)
+    @Path("/mock/{name}/{count}/{timeout}")
     @Produces(MediaType.TEXT_PLAIN)
-    public Response sqltx(String message) throws Exception {
-        String response = request("direct:sqltx", message);
-        LOG.infof("Got response from sqltx: %s", response);
-        return Response
-                .created(new URI("https://camel.apache.org/"))
-                .entity(response)
-                .build();
-    }
-
-    private String request(String endpoint, String message) throws Exception {
-        LOG.infof("message is %s", message);
-        MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
-        mockEndpoint.reset();
-        if (!message.equals("fail")) {
-            mockEndpoint.expectedMessageCount(1);
-            mockEndpoint.message(0).body().isEqualTo(message);
-        }
-        final String response = producerTemplate.requestBody(endpoint, message, String.class);
-        mockEndpoint.assertIsSatisfied(15000);
-
-        return response;
-    }
-
-    @Path("/mock")
     @GET
-    @Consumes(MediaType.TEXT_PLAIN)
-    @Produces(MediaType.TEXT_PLAIN)
-    public Response mock() throws Exception {
-        MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
-        List<Exchange> exchanges = mockEndpoint.getExchanges();
-        if (exchanges.isEmpty()) {
-            return Response.ok().entity("empty").build();
-        } else {
-            Message message = exchanges.get(0).getMessage();
-
-            LOG.infof("mock message is " + message.getBody());
-            return Response.ok().entity(message.getBody()).build();
+    public String mock(@PathParam("name") String name, @PathParam("count") int count, @PathParam("timeout") int timeout) {
+        MockEndpoint mock = context.getEndpoint("mock:" + name, MockEndpoint.class);
+        mock.setExpectedMessageCount(count);
+        try {
+            mock.assertIsSatisfied(timeout);
+        } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
         }
+        return mock.getExchanges().stream().map(e -> e.getMessage().getBody(String.class)).collect(Collectors.joining(","));
     }
 }
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
index 2828635..dec524f 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
@@ -51,11 +51,11 @@ public class JtaRoutes extends RouteBuilder {
         from("direct:jdbc")
                 .transacted()
                 .setHeader("message", body())
-                .to("jms:queue:txTest?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+                .to("jms:queue:jdbc?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
                 .transform().simple("insert into example(message, origin) values ('${body}', 'jdbc')")
                 .to("jdbc:camel-ds?resetAutoCommit=false")
                 .choice()
-                .when(header("message").startsWith("fail"))
+                .when(header("message").startsWith("rollback"))
                 .log("Failing forever with exception")
                 .process(x -> {
                     throw new RuntimeException("Fail");
@@ -63,14 +63,32 @@ public class JtaRoutes extends RouteBuilder {
                 .otherwise()
                 .transform().simple("${header.message} added")
                 .endChoice();
+        from("jms:queue:jdbc?connectionFactory=#xaConnectionFactory")
+                .to("mock:jdbc");
+
+        from("direct:jdbcRollback")
+                .transacted()
+                .setHeader("message", body())
+                .to("jms:queue:jdbcRollback?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+                .transform().simple("insert into example(message, origin) values ('${body}', 'jdbcRollback')")
+                .to("jdbc:camel-ds?resetAutoCommit=false")
+                .choice()
+                .when(header("message").startsWith("rollback"))
+                .log("Rolling back after rollback message")
+                .rollback()
+                .otherwise()
+                .transform().simple("${header.message} added")
+                .endChoice();
+        from("jms:queue:jdbcRollback?connectionFactory=#xaConnectionFactory")
+                .to("mock:jdbcRollback");
 
         from("direct:sqltx")
                 .transacted()
                 .setHeader("message", body())
-                .to("jms:queue:txTest?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+                .to("jms:queue:sqltx?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
                 .to("sql:insert into example(message, origin) values (:#message, 'sqltx')")
                 .choice()
-                .when(header("message").startsWith("fail"))
+                .when(header("message").startsWith("rollback"))
                 .log("Failing forever with exception")
                 .process(x -> {
                     throw new RuntimeException("Fail");
@@ -78,8 +96,23 @@ public class JtaRoutes extends RouteBuilder {
                 .otherwise()
                 .transform().simple("${header.message} added")
                 .endChoice();
+        from("jms:queue:sqltx?connectionFactory=#xaConnectionFactory")
+                .to("mock:sqltx");
+
+        from("direct:sqltxRollback")
+                .transacted()
+                .setHeader("message", body())
+                .to("jms:queue:sqltxRollback?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+                .to("sql:insert into example(message, origin) values (:#message, 'sqltxRollback')")
+                .choice()
+                .when(header("message").startsWith("rollback"))
+                .log("Rolling back after rollback message")
+                .rollback()
+                .otherwise()
+                .transform().simple("${header.message} added")
+                .endChoice();
+        from("jms:queue:sqltxRollback?connectionFactory=#xaConnectionFactory")
+                .to("mock:sqltxRollback");
 
-        from("jms:queue:txTest?connectionFactory=#xaConnectionFactory")
-                .to("mock:txResult");
     }
 }
diff --git a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
index 3c32225..146f351 100644
--- a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
+++ b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
@@ -21,6 +21,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.UUID;
 
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.common.ResourceArg;
@@ -29,6 +30,7 @@ import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
 import org.apache.camel.quarkus.test.support.activemq.ActiveMQTestResource;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -150,47 +152,65 @@ class JtaTest {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = { "jdbc", "sqltx" })
-    public void testTx(String url) throws SQLException {
-        final String msg = java.util.UUID.randomUUID().toString().replace("-", "");
+    @ValueSource(strings = { "jdbc", "jdbcRollback", "sqltx", "sqltxRollback" })
+    public void testTx(String endpoint) throws SQLException {
+        final String msg = endpoint + ":" + UUID.randomUUID().toString().replace("-", "");
+
+        assertDBRows(endpoint);
+        RestAssured.get("/jta/mock/" + endpoint + "/0/1000")
+                .then()
+                .statusCode(200)
+                .body(Matchers.is(""));
 
         RestAssured.given()
                 .contentType(ContentType.TEXT)
                 .body(msg)
-                .post("/jta/" + url)
+                .post("/jta/route/" + endpoint)
                 .then()
                 .statusCode(201)
                 .body(is(msg + " added"));
 
         // One row inserted
-        assertDBRowCount(url, 1);
+        assertDBRows(endpoint, msg);
+        RestAssured.get("/jta/mock/" + endpoint + "/1/15000")
+                .then()
+                .statusCode(200)
+                .body(Matchers.is(msg));
 
         RestAssured.given()
                 .contentType(ContentType.TEXT)
-                .body("fail")
-                .post("/jta/" + url)
+                .body("rollback")
+                .post("/jta/route/" + endpoint)
                 .then()
                 .statusCode(500);
 
         // Should still have the original row as the other insert attempt was rolled back
-        assertDBRowCount(url, 1);
-
-        RestAssured.given()
-                .contentType(ContentType.TEXT)
-                .get("/jta/mock")
+        assertDBRows(endpoint, msg);
+        RestAssured.get("/jta/mock/" + endpoint + "/1/15000")
                 .then()
                 .statusCode(200)
-                .body(is("empty"))
-                .log();
+                .body(Matchers.is(msg));
     }
 
-    private void assertDBRowCount(String source, int expectedRowCount) throws SQLException {
+    private void assertDBRows(String source, String... expectedMessages) throws SQLException {
         try (Connection connection = DriverManager.getConnection("jdbc:h2:tcp://localhost/mem:test")) {
             try (Statement statement = connection.createStatement()) {
                 try (ResultSet resultSet = statement
-                        .executeQuery("SELECT count(*) FROM example WHERE origin = '" + source + "'")) {
+                        .executeQuery("SELECT message FROM example WHERE origin = '" + source + "' ORDER BY id")) {
+                    int i = 0;
+                    for (; i < expectedMessages.length; i++) {
+                        String expectedMessage = expectedMessages[i];
+                        if (resultSet.next()) {
+                            Assertions.assertEquals(expectedMessage, resultSet.getString(1));
+                        } else {
+                            Assertions
+                                    .fail("Expected message '" + expectedMessage + "' for origin '" + source + "' at index " + i
+                                            + "; found: end of list");
+                        }
+                    }
                     if (resultSet.next()) {
-                        Assertions.assertEquals(expectedRowCount, resultSet.getInt(1));
+                        Assertions.fail("Expected end of list '" + source + "' at index " + i + "; found message: "
+                                + resultSet.getString(1));
                     }
                 }
             }