You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2021/03/02 08:05:42 UTC

[camel-quarkus] 03/05: Fix #1918 to add a integration test with camel-jdbc and camel-jms

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch quarkus-master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 7fbf9ee47608baf1c02e8b5972905c3a977b9c42
Author: Amos Feng <zf...@redhat.com>
AuthorDate: Tue Dec 22 13:40:51 2020 +0800

    Fix #1918 to add a integration test with camel-jdbc and camel-jms
---
 integration-tests/jta/pom.xml                      | 30 +++++++++
 .../quarkus/component/jta/it/JtaResource.java      | 71 +++++++++++++++++++-
 .../camel/quarkus/component/jta/it/JtaRoutes.java  | 19 ++++++
 .../jta/it/XAConnectionFactoryConfiguration.java   | 44 +++++++++++++
 .../jta/src/main/resources/application.properties  | 24 +++++++
 .../component/jta/it/ActiveMQXATestResource.java   | 76 ++++++++++++++++++++++
 .../camel/quarkus/component/jta/it/JtaTest.java    | 32 +++++++++
 7 files changed, 295 insertions(+), 1 deletion(-)

diff --git a/integration-tests/jta/pom.xml b/integration-tests/jta/pom.xml
index 78c6cdb..5692a3a 100644
--- a/integration-tests/jta/pom.xml
+++ b/integration-tests/jta/pom.xml
@@ -49,12 +49,32 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-jta</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-jms</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-jdbc</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-resteasy</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-jdbc-h2</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-artemis-jms</artifactId>
+        </dependency>
 
         <!-- test dependencies -->
         <dependency>
@@ -63,10 +83,20 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-test-h2</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>io.rest-assured</groupId>
             <artifactId>rest-assured</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-integration-testcontainers-support</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
         <dependency>
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
index 5c757a5..e73a658 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaResource.java
@@ -17,11 +17,17 @@
 package org.apache.camel.quarkus.component.jta.it;
 
 import java.net.URI;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
 
+import javax.annotation.PostConstruct;
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
 import javax.transaction.Transactional;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -29,18 +35,43 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import io.agroal.api.AgroalDataSource;
+import io.quarkus.agroal.DataSource;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.jboss.logging.Logger;
 
 @Path("/jta")
 @ApplicationScoped
 public class JtaResource {
-
     private static final Logger LOG = Logger.getLogger(JtaResource.class);
 
     @Inject
+    @DataSource("camel-ds")
+    AgroalDataSource dataSource;
+
+    @Inject
     ProducerTemplate producerTemplate;
 
+    @Inject
+    CamelContext context;
+
+    @PostConstruct
+    void postConstruct() throws SQLException {
+        try (Connection conn = dataSource.getConnection()) {
+            try (Statement statement = conn.createStatement()) {
+                try {
+                    statement.execute("drop table example");
+                } catch (Exception ignored) {
+                }
+                statement.execute("create table example (id serial primary key, message varchar(255) not null)");
+            }
+        }
+    }
+
     @Path("/{policy}")
     @POST
     @Consumes(MediaType.TEXT_PLAIN)
@@ -64,4 +95,42 @@ public class JtaResource {
         return post(policy, message);
     }
 
+    @Path("/jdbc")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response jdbc(String message) throws Exception {
+        LOG.infof("message is %s", message);
+        MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
+        mockEndpoint.reset();
+        if (!message.equals("fail")) {
+            mockEndpoint.expectedMessageCount(1);
+            mockEndpoint.message(0).body().isEqualTo(message);
+        }
+        final String response = producerTemplate.requestBody("direct:transaction", message, String.class);
+        mockEndpoint.assertIsSatisfied(15000);
+
+        LOG.infof("Got response from jta: %s", response);
+        return Response
+                .created(new URI("https://camel.apache.org/"))
+                .entity(response)
+                .build();
+    }
+
+    @Path("/mock")
+    @GET
+    @Consumes(MediaType.TEXT_PLAIN)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response mock() throws Exception {
+        MockEndpoint mockEndpoint = context.getEndpoint("mock:txResult", MockEndpoint.class);
+        List<Exchange> exchanges = mockEndpoint.getExchanges();
+        if (exchanges.isEmpty()) {
+            return Response.ok().entity("empty").build();
+        } else {
+            Message message = exchanges.get(0).getMessage();
+
+            LOG.infof("mock message is " + message.getBody());
+            return Response.ok().entity(message.getBody()).build();
+        }
+    }
 }
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
index a2fd01f..bb6b277 100644
--- a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/JtaRoutes.java
@@ -47,5 +47,24 @@ public class JtaRoutes extends RouteBuilder {
 
         from("direct:not_supported")
                 .transacted("PROPAGATION_NOT_SUPPORTED").transform().constant("not_supported");
+
+        from("direct:transaction")
+                .transacted()
+                .setHeader("message", body())
+                .to("jms:queue:txTest?connectionFactory=#xaConnectionFactory&disableReplyTo=true")
+                .transform().simple("insert into example(message) values ('${body}')")
+                .to("jdbc:camel-ds?resetAutoCommit=false")
+                .choice()
+                .when(header("message").startsWith("fail"))
+                .log("Failing forever with exception")
+                .process(x -> {
+                    throw new RuntimeException("Fail");
+                })
+                .otherwise()
+                .transform().simple("${header.message} added")
+                .endChoice();
+
+        from("jms:queue:txTest?connectionFactory=#xaConnectionFactory")
+                .to("mock:txResult");
     }
 }
diff --git a/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/XAConnectionFactoryConfiguration.java b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/XAConnectionFactoryConfiguration.java
new file mode 100644
index 0000000..f5d487b
--- /dev/null
+++ b/integration-tests/jta/src/main/java/org/apache/camel/quarkus/component/jta/it/XAConnectionFactoryConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jta.it;
+
+import javax.enterprise.context.Dependent;
+import javax.enterprise.inject.Produces;
+import javax.inject.Named;
+import javax.jms.ConnectionFactory;
+import javax.jms.XAConnectionFactory;
+import javax.transaction.TransactionManager;
+
+import io.quarkus.artemis.core.runtime.ArtemisRuntimeConfig;
+import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
+import org.jboss.narayana.jta.jms.ConnectionFactoryProxy;
+import org.jboss.narayana.jta.jms.TransactionHelperImpl;
+
+@Dependent
+public class XAConnectionFactoryConfiguration {
+
+    // This class should be remove if https://github.com/quarkusio/quarkus/issues/14871 resolved
+    // And the ConnectionFactory could be integrated with TransactionManager
+    @Produces
+    @Named("xaConnectionFactory")
+    public ConnectionFactory getXAConnectionFactory(TransactionManager tm, ArtemisRuntimeConfig config) {
+        XAConnectionFactory cf = new ActiveMQXAConnectionFactory(
+                config.url, config.username.orElse(null), config.password.orElse(null));
+        return new ConnectionFactoryProxy(cf, new TransactionHelperImpl(tm));
+
+    }
+}
diff --git a/integration-tests/jta/src/main/resources/application.properties b/integration-tests/jta/src/main/resources/application.properties
new file mode 100644
index 0000000..dcd0d95
--- /dev/null
+++ b/integration-tests/jta/src/main/resources/application.properties
@@ -0,0 +1,24 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# Quarkus :: DS
+#
+quarkus.datasource.camel-ds.jdbc.url=jdbc:h2:tcp://localhost/mem:test
+quarkus.datasource.camel-ds.db-kind=h2
+quarkus.datasource.camel-ds.jdbc.max-size=8
+quarkus.datasource.camel-ds.jdbc.transactions=xa
diff --git a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/ActiveMQXATestResource.java b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/ActiveMQXATestResource.java
new file mode 100644
index 0000000..823b311
--- /dev/null
+++ b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/ActiveMQXATestResource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jta.it;
+
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.apache.camel.util.CollectionHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.TestcontainersConfiguration;
+
+public class ActiveMQXATestResource implements QuarkusTestResourceLifecycleManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQXATestResource.class);
+    private static final String ACTIVEMQ_IMAGE = "vromero/activemq-artemis:2.11.0-alpine";
+    private static final String ACTIVEMQ_USERNAME = "artemis";
+    private static final String ACTIVEMQ_PASSWORD = "simetraehcapa";
+    private static final int ACTIVEMQ_PORT = 61616;
+
+    private GenericContainer<?> container;
+
+    @Override
+    public Map<String, String> start() {
+        LOGGER.info(TestcontainersConfiguration.getInstance().toString());
+
+        try {
+            container = new GenericContainer<>(ACTIVEMQ_IMAGE)
+                    .withExposedPorts(ACTIVEMQ_PORT)
+                    .withLogConsumer(new Slf4jLogConsumer(LOGGER))
+                    .withEnv("BROKER_CONFIG_MAX_DISK_USAGE", "100")
+                    .waitingFor(Wait.forListeningPort());
+
+            container.start();
+
+            String brokerUrlTcp = String.format("tcp://%s:%d/", container.getContainerIpAddress(),
+                    container.getMappedPort(ACTIVEMQ_PORT));
+
+            return CollectionHelper.mapOf(
+                    "quarkus.artemis.url", brokerUrlTcp,
+                    "quarkus.artemis.username", ACTIVEMQ_USERNAME,
+                    "quarkus.artemis.password", ACTIVEMQ_PASSWORD);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (container != null) {
+                container.stop();
+            }
+        } catch (Exception e) {
+            // ignored
+        }
+    }
+}
diff --git a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
index bf7382c..95dcd1c 100644
--- a/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
+++ b/integration-tests/jta/src/test/java/org/apache/camel/quarkus/component/jta/it/JtaTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.quarkus.component.jta.it;
 
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.h2.H2DatabaseTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
@@ -24,6 +26,8 @@ import org.junit.jupiter.api.Test;
 import static org.hamcrest.Matchers.is;
 
 @QuarkusTest
+@QuarkusTestResource(H2DatabaseTestResource.class)
+@QuarkusTestResource(ActiveMQXATestResource.class)
 class JtaTest {
 
     @Test
@@ -131,4 +135,32 @@ class JtaTest {
                 .statusCode(201)
                 .body(is("not_supported"));
     }
+
+    @Test
+    public void testJdbcInTx() {
+        final String msg = java.util.UUID.randomUUID().toString().replace("-", "");
+
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .body(msg)
+                .post("/jta/jdbc")
+                .then()
+                .statusCode(201)
+                .body(is(msg + " added"));
+
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .body("fail")
+                .post("/jta/jdbc")
+                .then()
+                .statusCode(500);
+
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .get("/jta/mock")
+                .then()
+                .statusCode(200)
+                .body(is("empty"))
+                .log();
+    }
 }