You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pp...@apache.org on 2021/07/29 06:08:42 UTC
[camel-quarkus] 01/10: Test rollback() EIP #2628
This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit c1e13bad0419003f876d39fa412c973a8902831d
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Fri Jul 23 18:57:11 2021 +0200
Test rollback() EIP #2628
---
.../quarkus/component/jta/it/JtaResource.java | 70 +++++++---------------
.../camel/quarkus/component/jta/it/JtaRoutes.java | 45 ++++++++++++--
.../camel/quarkus/component/jta/it/JtaTest.java | 54 +++++++++++------
3 files changed, 96 insertions(+), 73 deletions(-)
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
index 588f8d9..c67c9b9 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
@@ -20,10 +20,10 @@ import java.net.URI;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.List;
+import java.util.stream.Collectors;
-import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.Consumes;
@@ -38,10 +38,9 @@ import javax.ws.rs.core.Response;
import io.agroal.api.AgroalDataSource;
import io.quarkus.agroal.DataSource;
import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.quarkus.main.events.AfterStart;
import org.jboss.logging.Logger;
@Path("/jta")
@@ -59,16 +58,16 @@ public class JtaResource {
@Inject
CamelContext context;
- @PostConstruct
- void postConstruct() throws SQLException {
+ void postConstruct(@Observes AfterStart event) throws SQLException {
try (Connection conn = dataSource.getConnection()) {
try (Statement statement = conn.createStatement()) {
+ LOG.info("Recreating table 'example'");
try {
statement.execute("drop table example");
} catch (Exception ignored) {
}
statement.execute(
- "create table example (id serial primary key, message varchar(255) not null, origin varchar(5) not null)");
+ "create table example (id serial primary key, message varchar(255) not null, origin varchar(255) not null)");
}
}
}
@@ -96,60 +95,31 @@ public class JtaResource {
return post(policy, message);
}
- @Path("/jdbc")
+ @Path("/route/{route}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
- public Response jdbc(String message) throws Exception {
- String response = request("direct:jdbc", message);
- LOG.infof("Got response from jdbc: %s", response);
+ public Response route(@PathParam("route") String route, String message) throws Exception {
+ LOG.infof("message is %s", message);
+ String response = producerTemplate.requestBody("direct:" + route, message, String.class);
+ LOG.infof("Got response from %s: %s", route, response);
return Response
.created(new URI("https://camel.apache.org/"))
.entity(response)
.build();
}
- @Path("/sqltx")
- @POST
- @Consumes(MediaType.TEXT_PLAIN)
+ @Path("/mock/{name}/{count}/{timeout}")
@Produces(MediaType.TEXT_PLAIN)
- public Response sqltx(String message) throws Exception {
- String response = request("direct:sqltx", message);
- LOG.infof("Got response from sqltx: %s", response);
- return Response
- .created(new URI("https://camel.apache.org/"))
- .entity(response)
- .build();
- }
-
- private String request(String endpoint, String message) throws Exception {
- LOG.infof("message is %s", message);
- MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
- mockEndpoint.reset();
- if (!message.equals("fail")) {
- mockEndpoint.expectedMessageCount(1);
- mockEndpoint.message(0).body().isEqualTo(message);
- }
- final String response = producerTemplate.requestBody(endpoint, message, String.class);
- mockEndpoint.assertIsSatisfied(15000);
-
- return response;
- }
-
- @Path("/mock")
@GET
- @Consumes(MediaType.TEXT_PLAIN)
- @Produces(MediaType.TEXT_PLAIN)
- public Response mock() throws Exception {
- MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
- List<Exchange> exchanges = mockEndpoint.getExchanges();
- if (exchanges.isEmpty()) {
- return Response.ok().entity("empty").build();
- } else {
- Message message = exchanges.get(0).getMessage();
-
- LOG.infof("mock message is " + message.getBody());
- return Response.ok().entity(message.getBody()).build();
+ public String mock(@PathParam("name") String name, @PathParam("count") int count, @PathParam("timeout") int timeout) {
+ MockEndpoint mock = context.getEndpoint("mock:" + name, MockEndpoint.class);
+ mock.setExpectedMessageCount(count);
+ try {
+ mock.assertIsSatisfied(timeout);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
}
+ return mock.getExchanges().stream().map(e -> e.getMessage().getBody(String.class)).collect(Collectors.joining(","));
}
}
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
index 2828635..dec524f 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
@@ -51,11 +51,11 @@ public class JtaRoutes extends RouteBuilder {
from("direct:jdbc")
.transacted()
.setHeader("message", body())
- .to("jms:queue:txTest?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+ .to("jms:queue:jdbc?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
.transform().simple("insert into example(message, origin) values ('${body}', 'jdbc')")
.to("jdbc:camel-ds?resetAutoCommit=false")
.choice()
- .when(header("message").startsWith("fail"))
+ .when(header("message").startsWith("rollback"))
.log("Failing forever with exception")
.process(x -> {
throw new RuntimeException("Fail");
@@ -63,14 +63,32 @@ public class JtaRoutes extends RouteBuilder {
.otherwise()
.transform().simple("${header.message} added")
.endChoice();
+ from("jms:queue:jdbc?connectionFactory=#xaConnectionFactory")
+ .to("mock:jdbc");
+
+ from("direct:jdbcRollback")
+ .transacted()
+ .setHeader("message", body())
+ .to("jms:queue:jdbcRollback?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+ .transform().simple("insert into example(message, origin) values ('${body}', 'jdbcRollback')")
+ .to("jdbc:camel-ds?resetAutoCommit=false")
+ .choice()
+ .when(header("message").startsWith("rollback"))
+ .log("Rolling back after rollback message")
+ .rollback()
+ .otherwise()
+ .transform().simple("${header.message} added")
+ .endChoice();
+ from("jms:queue:jdbcRollback?connectionFactory=#xaConnectionFactory")
+ .to("mock:jdbcRollback");
from("direct:sqltx")
.transacted()
.setHeader("message", body())
- .to("jms:queue:txTest?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+ .to("jms:queue:sqltx?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
.to("sql:insert into example(message, origin) values (:#message, 'sqltx')")
.choice()
- .when(header("message").startsWith("fail"))
+ .when(header("message").startsWith("rollback"))
.log("Failing forever with exception")
.process(x -> {
throw new RuntimeException("Fail");
@@ -78,8 +96,23 @@ public class JtaRoutes extends RouteBuilder {
.otherwise()
.transform().simple("${header.message} added")
.endChoice();
+ from("jms:queue:sqltx?connectionFactory=#xaConnectionFactory")
+ .to("mock:sqltx");
+
+ from("direct:sqltxRollback")
+ .transacted()
+ .setHeader("message", body())
+ .to("jms:queue:sqltxRollback?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+ .to("sql:insert into example(message, origin) values (:#message, 'sqltxRollback')")
+ .choice()
+ .when(header("message").startsWith("rollback"))
+ .log("Rolling back after rollback message")
+ .rollback()
+ .otherwise()
+ .transform().simple("${header.message} added")
+ .endChoice();
+ from("jms:queue:sqltxRollback?connectionFactory=#xaConnectionFactory")
+ .to("mock:sqltxRollback");
- from("jms:queue:txTest?connectionFactory=#xaConnectionFactory")
- .to("mock:txResult");
}
}
diff --git a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
index 3c32225..146f351 100644
--- a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
+++ b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
@@ -21,6 +21,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.UUID;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.ResourceArg;
@@ -29,6 +30,7 @@ import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.camel.quarkus.test.support.activemq.ActiveMQTestResource;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -150,47 +152,65 @@ class JtaTest {
}
@ParameterizedTest
- @ValueSource(strings = { "jdbc", "sqltx" })
- public void testTx(String url) throws SQLException {
- final String msg = java.util.UUID.randomUUID().toString().replace("-", "");
+ @ValueSource(strings = { "jdbc", "jdbcRollback", "sqltx", "sqltxRollback" })
+ public void testTx(String endpoint) throws SQLException {
+ final String msg = endpoint + ":" + UUID.randomUUID().toString().replace("-", "");
+
+ assertDBRows(endpoint);
+ RestAssured.get("/jta/mock/" + endpoint + "/0/1000")
+ .then()
+ .statusCode(200)
+ .body(Matchers.is(""));
RestAssured.given()
.contentType(ContentType.TEXT)
.body(msg)
- .post("/jta/" + url)
+ .post("/jta/route/" + endpoint)
.then()
.statusCode(201)
.body(is(msg + " added"));
// One row inserted
- assertDBRowCount(url, 1);
+ assertDBRows(endpoint, msg);
+ RestAssured.get("/jta/mock/" + endpoint + "/1/15000")
+ .then()
+ .statusCode(200)
+ .body(Matchers.is(msg));
RestAssured.given()
.contentType(ContentType.TEXT)
- .body("fail")
- .post("/jta/" + url)
+ .body("rollback")
+ .post("/jta/route/" + endpoint)
.then()
.statusCode(500);
// Should still have the original row as the other insert attempt was rolled back
- assertDBRowCount(url, 1);
-
- RestAssured.given()
- .contentType(ContentType.TEXT)
- .get("/jta/mock")
+ assertDBRows(endpoint, msg);
+ RestAssured.get("/jta/mock/" + endpoint + "/1/15000")
.then()
.statusCode(200)
- .body(is("empty"))
- .log();
+ .body(Matchers.is(msg));
}
- private void assertDBRowCount(String source, int expectedRowCount) throws SQLException {
+ private void assertDBRows(String source, String... expectedMessages) throws SQLException {
try (Connection connection = DriverManager.getConnection("jdbc:h2:tcp://localhost/mem:test")) {
try (Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement
- .executeQuery("SELECT count(*) FROM example WHERE origin = '" + source + "'")) {
+ .executeQuery("SELECT message FROM example WHERE origin = '" + source + "' ORDER BY id")) {
+ int i = 0;
+ for (; i < expectedMessages.length; i++) {
+ String expectedMessage = expectedMessages[i];
+ if (resultSet.next()) {
+ Assertions.assertEquals(expectedMessage, resultSet.getString(1));
+ } else {
+ Assertions
+ .fail("Expected message '" + expectedMessage + "' for origin '" + source + "' at index " + i
+ + "; found: end of list");
+ }
+ }
if (resultSet.next()) {
- Assertions.assertEquals(expectedRowCount, resultSet.getInt(1));
+ Assertions.fail("Expected end of list '" + source + "' at index " + i + "; found message: "
+ + resultSet.getString(1));
}
}
}