You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ff...@apache.org on 2022/03/08 22:43:35 UTC
[camel-spring-boot] branch main updated: [CAMEL-17758]add tests in camel-cassandraql-starter (#458)
This is an automated email from the ASF dual-hosted git repository.
ffang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push:
new f43a4b4 [CAMEL-17758]add tests in camel-cassandraql-starter (#458)
f43a4b4 is described below
commit f43a4b40a226ba3a2a28b8716230669c5e5e150a
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Tue Mar 8 17:38:51 2022 -0500
[CAMEL-17758]add tests in camel-cassandraql-starter (#458)
(cherry picked from commit c7335a6467079c6bc1901037681d351597834752)
---
.../camel-cassandraql-starter/pom.xml | 70 ++++++
.../cassandra/MockLoadBalancingPolicy.java | 43 ++++
.../integration/CassandraComponentBeanRefIT.java | 92 ++++++++
.../integration/CassandraComponentConsumerIT.java | 134 ++++++++++++
.../integration/CassandraComponentProducerIT.java | 234 +++++++++++++++++++++
.../CassandraComponentProducerUnpreparedIT.java | 156 ++++++++++++++
.../cassandra/springboot/BaseCassandra.java | 113 ++++++++++
.../cassandra/CassandraAggregationIT.java | 123 +++++++++++
.../CassandraAggregationRepositoryIT.java | 222 +++++++++++++++++++
.../CassandraAggregationSerializedHeadersIT.java | 128 +++++++++++
.../NamedCassandraAggregationRepositoryIT.java | 227 ++++++++++++++++++++
.../camel/processor/aggregate/util/HeaderDto.java | 62 ++++++
.../cassandra/CassandraIdempotentIT.java | 96 +++++++++
.../cassandra/CassandraIdempotentRepositoryIT.java | 153 ++++++++++++++
.../NamedCassandraIdempotentRepositoryIT.java | 155 ++++++++++++++
.../src/test/resources/BasicDataSet.cql | 6 +
.../src/test/resources/IdempotentDataSet.cql | 8 +
.../src/test/resources/NamedIdempotentDataSet.cql | 8 +
.../src/test/resources/initScript.cql | 33 +++
19 files changed, 2063 insertions(+)
diff --git a/components-starter/camel-cassandraql-starter/pom.xml b/components-starter/camel-cassandraql-starter/pom.xml
index 347a961..36a0f63 100644
--- a/components-starter/camel-cassandraql-starter/pom.xml
+++ b/components-starter/camel-cassandraql-starter/pom.xml
@@ -39,6 +39,14 @@
<artifactId>camel-cassandraql</artifactId>
<version>${camel-version}</version>
</dependency>
+ <!-- test infra -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-cassandra</artifactId>
+ <version>${camel-version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<!--START OF GENERATED CODE-->
<dependency>
<groupId>com.datastax.oss</groupId>
@@ -56,4 +64,66 @@
</dependency>
<!--END OF GENERATED CODE-->
</dependencies>
+ <profiles>
+ <!-- activate integration test if the docker socket file is accessible -->
+ <profile>
+ <id>cassandraql-integration-tests-docker-file</id>
+ <activation>
+ <file>
+ <exists>/var/run/docker.sock</exists>
+ </file>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <visibleassertions.silence>true</visibleassertions.silence>
+ </systemPropertyVariables>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <!-- activate integration test if the DOCKER_HOST env var is set -->
+ <profile>
+ <id>cassandraql-integration-tests-docker-env</id>
+ <activation>
+ <property>
+ <name>env.DOCKER_HOST</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <visibleassertions.silence>true</visibleassertions.silence>
+ </systemPropertyVariables>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java
new file mode 100644
index 0000000..8a68c07
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra;
+
+import java.util.Queue;
+
+import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.session.Request;
+import com.datastax.oss.driver.api.core.session.Session;
+import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+public class MockLoadBalancingPolicy extends DefaultLoadBalancingPolicy {
+
+ public static boolean used;
+
+ public MockLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
+ super(context, profileName);
+ }
+
+ @NonNull
+ @Override
+ public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
+ MockLoadBalancingPolicy.used = true;
+ return super.newQueryPlan(request, session);
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java
new file mode 100644
index 0000000..7237352
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+
+
+import com.datastax.oss.driver.api.core.CqlSession;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraEndpoint;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraComponentBeanRefIT.class,
+ CassandraComponentBeanRefIT.TestConfiguration.class
+ }
+)
+public class CassandraComponentBeanRefIT extends BaseCassandra {
+
+ public static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
+ public static final String SESSION_URI = "cql:bean:cassandraSession?cql=" + CQL;
+
+ @Bean("cassandraSession")
+ protected CqlSession createSession() {
+
+ return getSession();
+ }
+
+
+
+
+ @Test
+ public void testSession() {
+
+
+ CassandraEndpoint endpoint = context.getEndpoint(SESSION_URI, CassandraEndpoint.class);
+ assertNotNull(endpoint, "No endpoint found for uri: " + SESSION_URI);
+
+ assertEquals(KEYSPACE_NAME, endpoint.getKeyspace());
+ assertEquals(CQL, endpoint.getCql());
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:inputSession").to(SESSION_URI);
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java
new file mode 100644
index 0000000..676da47
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.oss.driver.api.core.cql.Row;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraComponentConsumerIT.class,
+ CassandraComponentConsumerIT.TestConfiguration.class
+ }
+)
+public class CassandraComponentConsumerIT extends BaseCassandra {
+
+ static final String CQL = "select login, first_name, last_name from camel_user";
+
+ @EndpointInject("mock:resultAll")
+ MockEndpoint mock;
+
+ @EndpointInject("mock:resultUnprepared")
+ MockEndpoint mockResulutUnprepared;
+
+ @EndpointInject("mock:resultOne")
+ MockEndpoint mockResulutOne;
+
+
+ @Test
+ public void testConsumeAll() throws Exception {
+
+ mock.expectedMinimumMessageCount(1);
+ mock.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ Object body = exchange.getIn().getBody();
+ assertTrue(body instanceof List);
+ }
+ });
+ mock.await(1, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void testConsumeUnprepared() throws Exception {
+
+ mockResulutUnprepared.expectedMinimumMessageCount(1);
+ mockResulutUnprepared.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ Object body = exchange.getIn().getBody();
+ assertTrue(body instanceof List);
+ }
+ });
+ mockResulutUnprepared.await(1, TimeUnit.SECONDS);
+ mockResulutUnprepared.assertIsSatisfied();
+ }
+
+ @Test
+ public void testConsumeOne() throws Exception {
+
+ mockResulutOne.expectedMinimumMessageCount(1);
+ mockResulutOne.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ Object body = exchange.getIn().getBody();
+ assertTrue(body instanceof Row);
+ }
+ });
+ mock.await(1, TimeUnit.SECONDS);
+
+ mockResulutOne.assertIsSatisfied();
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)).to("mock:resultAll");
+ from(String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, CQL))
+ .to("mock:resultUnprepared");
+ from(String.format("cql://%s/%s?cql=%s&resultSetConversionStrategy=ONE", getUrl(), KEYSPACE_NAME, CQL))
+ .to("mock:resultOne");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java
new file mode 100644
index 0000000..f17e502
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.update.Update;
+
+
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraConstants;
+import org.apache.camel.component.cassandra.CassandraEndpoint;
+import org.apache.camel.component.cassandra.MockLoadBalancingPolicy;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraComponentProducerIT.class,
+ CassandraComponentProducerIT.TestConfiguration.class
+ }
+)
+public class CassandraComponentProducerIT extends BaseCassandra {
+
+ static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
+ static final String NO_PARAMETER_CQL = "select login, first_name, last_name from camel_user";
+
+ @Produce("direct:input")
+ ProducerTemplate producerTemplate;
+
+ @Produce("direct:inputNoParameter")
+ ProducerTemplate noParameterProducerTemplate;
+
+ @Produce("direct:inputNotConsistent")
+ ProducerTemplate notConsistentProducerTemplate;
+
+ @Produce("direct:loadBalancingPolicy")
+ ProducerTemplate loadBalancingPolicyTemplate;
+
+ @Produce("direct:inputNoEndpointCql")
+ ProducerTemplate producerTemplateNoEndpointCql;
+
+
+ @Test
+ public void testRequestUriCql() {
+ producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang"));
+
+ ResultSet resultSet = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "w_jiang"));
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Willem", row.getString("first_name"));
+ assertEquals("Jiang", row.getString("last_name"));
+ }
+
+ @Test
+ public void testRequestNoParameterNull() {
+ Object response = noParameterProducerTemplate.requestBody(null);
+
+ assertNotNull(response);
+ assertIsInstanceOf(List.class, response);
+ }
+
+ @Test
+ public void testRequestNoParameterEmpty() {
+ Object response = noParameterProducerTemplate.requestBody(Collections.emptyList());
+
+ assertNotNull(response);
+ assertIsInstanceOf(List.class, response);
+ }
+
+ @Test
+ public void testRequestMessageCql() {
+ producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+ "update camel_user set first_name=?, last_name=? where login=?");
+
+ ResultSet resultSet = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Claus 2", row.getString("first_name"));
+ assertEquals("Ibsen 2", row.getString("last_name"));
+ }
+
+ @Test
+ public void testLoadBalancing() {
+ loadBalancingPolicyTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" },
+ CassandraConstants.CQL_QUERY,
+ "update camel_user set first_name=?, last_name=? where login=?");
+
+ ResultSet resultSet = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Claus 2", row.getString("first_name"));
+ assertEquals("Ibsen 2", row.getString("last_name"));
+
+ Assertions.assertTrue(MockLoadBalancingPolicy.used);
+ }
+
+ /**
+ * Test with incoming message containing a header with RegularStatement.
+ */
+ @Test
+ public void testRequestMessageStatement() {
+
+ Update update = QueryBuilder.update("camel_user")
+ .setColumn("first_name", bindMarker())
+ .setColumn("last_name", bindMarker())
+ .whereColumn("login").isEqualTo(bindMarker());
+ producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+ update.build());
+
+ ResultSet resultSet = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Claus 2", row.getString("first_name"));
+ assertEquals("Ibsen 2", row.getString("last_name"));
+ }
+
+ /**
+ * Simulate different CQL statements in the incoming message containing a header with RegularStatement, justifying
+ * the cassandracql endpoint not containing a "cql" Uri parameter
+ */
+ @Test
+ public void testEndpointNoCqlParameter() {
+ Update update = QueryBuilder.update("camel_user")
+ .setColumn("first_name", bindMarker())
+ .whereColumn("login").isEqualTo(bindMarker());
+ producerTemplateNoEndpointCql.sendBodyAndHeader(new Object[] { "Claus 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+ update.build());
+
+ ResultSet resultSet1 = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+ Row row1 = resultSet1.one();
+ assertNotNull(row1);
+ assertEquals("Claus 2", row1.getString("first_name"));
+ assertEquals("Ibsen", row1.getString("last_name"));
+
+ update = QueryBuilder.update("camel_user")
+ .setColumn("last_name", bindMarker())
+ .whereColumn("login").isEqualTo(bindMarker());
+ producerTemplateNoEndpointCql.sendBodyAndHeader(new Object[] { "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+ update.build());
+
+ ResultSet resultSet2 = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+ Row row2 = resultSet2.one();
+ assertNotNull(row2);
+ assertEquals("Claus 2", row2.getString("first_name"));
+ assertEquals("Ibsen 2", row2.getString("last_name"));
+ }
+
+ @Test
+ public void testRequestNotConsistent() {
+ CassandraEndpoint endpoint
+ = context.getEndpoint(String.format("cql://%s/%s?cql=%s&consistencyLevel=ANY", getUrl(), KEYSPACE_NAME, CQL),
+ CassandraEndpoint.class);
+ assertEquals(ConsistencyLevel.ANY, endpoint.getConsistencyLevel());
+
+ notConsistentProducerTemplate.requestBody(Arrays.asList("j_anstey", "Jonathan", "Anstey"));
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:input").to(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL));
+ from("direct:inputNoParameter")
+ .to(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL));
+ from("direct:loadBalancingPolicy").to(String.format(
+ "cql://%s/%s?cql=%s&loadBalancingPolicyClass=org.apache.camel.component.cassandra.MockLoadBalancingPolicy",
+ getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL));
+ from("direct:inputNotConsistent")
+ .to(String.format("cql://%s/%s?cql=%s&consistencyLevel=ANY", getUrl(), KEYSPACE_NAME, CQL));
+ from("direct:inputNoEndpointCql").to(String.format("cql://%s/%s", getUrl(), KEYSPACE_NAME));
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java
new file mode 100644
index 0000000..c8bc4f1
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.update.Update;
+
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraConstants;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraComponentProducerUnpreparedIT.class,
+ CassandraComponentProducerUnpreparedIT.TestConfiguration.class
+ }
+)
+public class CassandraComponentProducerUnpreparedIT extends BaseCassandra {
+
+ static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
+ static final String NO_PARAMETER_CQL = "select login, first_name, last_name from camel_user";
+
+
+
+
+ @Produce("direct:input")
+ ProducerTemplate producerTemplate;
+
+ @Produce("direct:inputNoParameter")
+ ProducerTemplate noParameterProducerTemplate;
+
+
+ @Test
+ public void testRequestUriCql() {
+ producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang"));
+
+ ResultSet resultSet = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "w_jiang"));
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Willem", row.getString("first_name"));
+ assertEquals("Jiang", row.getString("last_name"));
+ }
+
+ @Test
+ public void testRequestNoParameterNull() {
+ Object response = noParameterProducerTemplate.requestBody(null);
+
+ assertNotNull(response);
+ assertIsInstanceOf(List.class, response);
+ }
+
+ @Test
+ public void testRequestNoParameterEmpty() {
+ Object response = noParameterProducerTemplate.requestBody(null);
+
+ assertNotNull(response);
+ assertIsInstanceOf(List.class, response);
+ }
+
+ @Test
+ public void testRequestMessageCql() {
+ producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+ "update camel_user set first_name=?, last_name=? where login=?");
+
+ ResultSet resultSet = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Claus 2", row.getString("first_name"));
+ assertEquals("Ibsen 2", row.getString("last_name"));
+ }
+
+ /**
+ * Test with incoming message containing a header with RegularStatement.
+ */
+ @Test
+ public void testRequestMessageStatement() {
+ Update update = QueryBuilder.update("camel_user")
+ .setColumn("first_name", literal("Claus 2"))
+ .setColumn("last_name", literal("Ibsen 2"))
+ .whereColumn("login").isEqualTo(literal("c_ibsen"));
+ producerTemplate.requestBodyAndHeader(null, CassandraConstants.CQL_QUERY, update.build());
+
+ ResultSet resultSet = getSession()
+ .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Claus 2", row.getString("first_name"));
+ assertEquals("Ibsen 2", row.getString("last_name"));
+ }
+
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:input")
+ .to(String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, CQL));
+ from("direct:inputNoParameter").to(
+ String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL));
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java
new file mode 100644
index 0000000..dd26263
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.springboot;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.test.infra.cassandra.services.CassandraLocalContainerService;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+public class BaseCassandra {
+
+ @Autowired
+ protected CamelContext context;
+
+ @Autowired
+ protected ProducerTemplate template;
+
+ @RegisterExtension
+ public static CassandraLocalContainerService service;
+
+ public static final String KEYSPACE_NAME = "camel_ks";
+ public static final String DATACENTER_NAME = "datacenter1";
+
+ private CqlSession session;
+
+ static {
+ service = new CassandraLocalContainerService();
+
+ service.getContainer()
+ .withInitScript("initScript.cql")
+ .withNetworkAliases("cassandra");
+
+ }
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ executeScript("BasicDataSet.cql");
+
+ }
+
+ public void executeScript(String pathToScript) throws IOException {
+ String s = IOUtils.toString(getClass().getResourceAsStream("/" + pathToScript), "UTF-8");
+ String[] statements = s.split(";");
+ for (int i = 0; i < statements.length; i++) {
+ if (!statements[i].isEmpty()) {
+ executeCql(statements[i]);
+ }
+ }
+ }
+
+ public void executeCql(String cql) {
+ getSession().execute(cql);
+ }
+
+ @AfterEach
+ protected void doPostTearDown() throws Exception {
+
+ try {
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ } catch (Exception e) {
+ // ignored
+ }
+ }
+
+ public CqlSession getSession() {
+ if (session == null) {
+ InetSocketAddress endpoint
+ = new InetSocketAddress(service.getCassandraHost(), service.getCQL3Port());
+ //create a new session
+ session = CqlSession.builder()
+ .withLocalDatacenter(DATACENTER_NAME)
+ .withKeyspace(KEYSPACE_NAME)
+ .withConfigLoader(DriverConfigLoader.programmaticBuilder()
+ .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(5)).build())
+ .addContactPoint(endpoint).build();
+ }
+ return session;
+ }
+
+ public String getUrl() {
+ return service.getCQL3Endpoint();
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java
new file mode 100644
index 0000000..07fbcca
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraAggregationIT.class,
+ CassandraAggregationIT.TestConfiguration.class
+ }
+)
+public class CassandraAggregationIT extends BaseCassandra {
+
+ CassandraAggregationRepository aggregationRepository;
+
+
+ @EndpointInject("mock:output")
+ MockEndpoint mockOutput;
+
+ @BeforeEach
+ protected void doPreSetup() throws Exception {
+ aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID");
+ aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
+ aggregationRepository.start();
+
+ }
+
+
+ @AfterEach
+ public void tearDown() throws Exception {
+
+ aggregationRepository.stop();
+ }
+
+ private void send(String aggregationId, String body) {
+ super.template.sendBodyAndHeader("direct:input", body, "aggregationId", aggregationId);
+ }
+
+ @Test
+ public void testAggregationRoute() throws Exception {
+ // Given
+
+ mockOutput.expectedMessageCount(2);
+ mockOutput.expectedBodiesReceivedInAnyOrder("A,C,E", "B,D");
+ // When
+ send("1", "A");
+ send("2", "B");
+ send("1", "C");
+ send("2", "D");
+ send("1", "E");
+ // Then
+ mockOutput.assertIsSatisfied(4000L);
+ }
+
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ AggregationStrategy aggregationStrategy = new AggregationStrategy() {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String oldBody = oldExchange.getIn().getBody(String.class);
+ String newBody = newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(oldBody + "," + newBody);
+ return oldExchange;
+ }
+ };
+ from("direct:input").aggregate(header("aggregationId"), aggregationStrategy).completionSize(3)
+ .completionTimeout(3000L).aggregationRepository(aggregationRepository)
+ .to("mock:output");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java
new file mode 100644
index 0000000..ba3e3ff
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import java.util.Set;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.support.DefaultExchange;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraAggregationRepositoryIT.class
+ }
+)
+public class CassandraAggregationRepositoryIT extends BaseCassandra {
+
+ CassandraAggregationRepository aggregationRepository;
+
+
+ @EndpointInject("mock:output")
+ MockEndpoint mockOutput;
+
+ @BeforeEach
+ protected void doPreSetup() throws Exception {
+ aggregationRepository = new CassandraAggregationRepository(getSession());
+ aggregationRepository.start();
+
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ aggregationRepository.stop();
+ }
+
+ private boolean exists(String key) {
+ return getSession().execute(String.format("select KEY from CAMEL_AGGREGATION where KEY='%s'", key)).one() != null;
+ }
+
+ @Test
+ public void testAdd() {
+ // Given
+ String key = "Add";
+ assertFalse(exists(key));
+ Exchange exchange = new DefaultExchange(context);
+ // When
+ aggregationRepository.add(context, key, exchange);
+ // Then
+ assertTrue(exists(key));
+ }
+
+ @Test
+ public void testGetExists() {
+ // Given
+ String key = "Get_Exists";
+ Exchange exchange = new DefaultExchange(context);
+ aggregationRepository.add(context, key, exchange);
+ assertTrue(exists(key));
+ // When
+ Exchange exchange2 = aggregationRepository.get(context, key);
+ // Then
+ assertNotNull(exchange2);
+ assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+ }
+
+ @Test
+ public void testGetNotExists() {
+ // Given
+ String key = "Get_NotExists";
+ assertFalse(exists(key));
+ // When
+ Exchange exchange2 = aggregationRepository.get(context, key);
+ // Then
+ assertNull(exchange2);
+ }
+
+ @Test
+ public void testRemoveExists() {
+ // Given
+ String key = "Remove_Exists";
+ Exchange exchange = new DefaultExchange(context);
+ aggregationRepository.add(context, key, exchange);
+ assertTrue(exists(key));
+ // When
+ aggregationRepository.remove(context, key, exchange);
+ // Then
+ assertFalse(exists(key));
+ }
+
+ @Test
+ public void testRemoveNotExists() {
+ // Given
+ String key = "RemoveNotExists";
+ Exchange exchange = new DefaultExchange(context);
+ assertFalse(exists(key));
+ // When
+ aggregationRepository.remove(context, key, exchange);
+ // Then
+ assertFalse(exists(key));
+ }
+
+ @Test
+ public void testGetKeys() {
+ // Given
+ String[] keys = { "GetKeys1", "GetKeys2" };
+ addExchanges(keys);
+ // When
+ Set<String> keySet = aggregationRepository.getKeys();
+ // Then
+ for (String key : keys) {
+ assertTrue(keySet.contains(key));
+ }
+ }
+
+ @Test
+ public void testConfirmExist() {
+ // Given
+ for (int i = 1; i < 4; i++) {
+ String key = "Confirm_" + i;
+ Exchange exchange = new DefaultExchange(context);
+ exchange.setExchangeId("Exchange_" + i);
+ aggregationRepository.add(context, key, exchange);
+ assertTrue(exists(key));
+ }
+ // When
+ aggregationRepository.confirm(context, "Exchange_2");
+ // Then
+ assertTrue(exists("Confirm_1"));
+ assertFalse(exists("Confirm_2"));
+ assertTrue(exists("Confirm_3"));
+ }
+
+ @Test
+ public void testConfirmNotExist() {
+ // Given
+ String[] keys = new String[3];
+ for (int i = 1; i < 4; i++) {
+ keys[i - 1] = "Confirm" + i;
+ }
+ addExchanges(keys);
+ for (String key : keys) {
+ assertTrue(exists(key));
+ }
+ // When
+ aggregationRepository.confirm(context, "Exchange-Confirm5");
+ // Then
+ for (String key : keys) {
+ assertTrue(exists(key));
+ }
+ }
+
+ private void addExchanges(String... keys) {
+ for (String key : keys) {
+ Exchange exchange = new DefaultExchange(context);
+ exchange.setExchangeId("Exchange-" + key);
+ aggregationRepository.add(context, key, exchange);
+ }
+ }
+
+ @Test
+ public void testScan() {
+ // Given
+ String[] keys = { "Scan1", "Scan2" };
+ addExchanges(keys);
+ // When
+ Set<String> exchangeIdSet = aggregationRepository.scan(context);
+ // Then
+ for (String key : keys) {
+ assertTrue(exchangeIdSet.contains("Exchange-" + key));
+ }
+ }
+
+ @Test
+ public void testRecover() {
+ // Given
+ String[] keys = { "Recover1", "Recover2" };
+ addExchanges(keys);
+ // When
+ Exchange exchange2 = aggregationRepository.recover(context, "Exchange-Recover2");
+ Exchange exchange3 = aggregationRepository.recover(context, "Exchange-Recover3");
+ // Then
+ assertNotNull(exchange2);
+ assertNull(exchange3);
+ }
+
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java
new file mode 100644
index 0000000..3d6c7cc
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.util.HeaderDto;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraAggregationSerializedHeadersIT.class,
+ CassandraAggregationSerializedHeadersIT.TestConfiguration.class
+ }
+)
+public class CassandraAggregationSerializedHeadersIT extends BaseCassandra {
+
+ CassandraAggregationRepository aggregationRepository;
+
+
+ @EndpointInject("mock:output")
+ MockEndpoint mockOutput;
+
+ @BeforeEach
+ protected void doPreSetup() throws Exception {
+ aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID");
+ aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
+ aggregationRepository.setAllowSerializedHeaders(true);
+ aggregationRepository.start();
+
+ }
+
+
+ @AfterEach
+ public void tearDown() throws Exception {
+
+ aggregationRepository.stop();
+ }
+
+ private void send(HeaderDto aggregationId, String body) {
+ template.sendBodyAndHeader("direct:input", body, "aggregationId", aggregationId);
+ }
+
+ @Test
+ public void testAggregationRoute() throws Exception {
+ // Given
+
+ mockOutput.expectedMessageCount(2);
+ mockOutput.expectedBodiesReceivedInAnyOrder("A,C,E", "B,D");
+ HeaderDto dto1 = new HeaderDto("org", "company", 1);
+ HeaderDto dto2 = new HeaderDto("org", "company", 2);
+ // When
+ send(dto1, "A");
+ send(dto2, "B");
+ send(dto1, "C");
+ send(dto2, "D");
+ send(dto1, "E");
+ // Then
+ mockOutput.assertIsSatisfied(4000L);
+
+ }
+
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ AggregationStrategy aggregationStrategy = new AggregationStrategy() {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String oldBody = oldExchange.getIn().getBody(String.class);
+ String newBody = newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(oldBody + "," + newBody);
+ return oldExchange;
+ }
+ };
+ from("direct:input").aggregate(header("aggregationId"), aggregationStrategy).completionSize(3)
+ .completionTimeout(3000L).aggregationRepository(aggregationRepository)
+ .to("mock:output");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java
new file mode 100644
index 0000000..d1afc1c
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import java.util.Set;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.support.DefaultExchange;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ NamedCassandraAggregationRepositoryIT.class
+ }
+)
+public class NamedCassandraAggregationRepositoryIT extends BaseCassandra {
+
+ CassandraAggregationRepository aggregationRepository;
+
+
+ @EndpointInject("mock:output")
+ MockEndpoint mockOutput;
+
+ @BeforeEach
+ protected void doPreSetup() throws Exception {
+ aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID");
+ aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
+ aggregationRepository.start();
+
+ }
+
+
+ @AfterEach
+ public void tearDown() throws Exception {
+
+ aggregationRepository.stop();
+ }
+
+ private boolean exists(String key) {
+ return getSession().execute(String.format("select KEY from NAMED_CAMEL_AGGREGATION where NAME='ID' and KEY='%s'", key))
+ .one()
+ != null;
+ }
+
+ @Test
+ public void testAdd() {
+ // Given
+ String key = "Add";
+ assertFalse(exists(key));
+ Exchange exchange = new DefaultExchange(context);
+ // When
+ aggregationRepository.add(context, key, exchange);
+ // Then
+ assertTrue(exists(key));
+ }
+
+ @Test
+ public void testGetExists() {
+ // Given
+ String key = "Get_Exists";
+ Exchange exchange = new DefaultExchange(context);
+ aggregationRepository.add(context, key, exchange);
+ assertTrue(exists(key));
+ // When
+ Exchange exchange2 = aggregationRepository.get(context, key);
+ // Then
+ assertNotNull(exchange2);
+ assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+ }
+
+ @Test
+ public void testGetNotExists() {
+ // Given
+ String key = "Get_NotExists";
+ assertFalse(exists(key));
+ // When
+ Exchange exchange2 = aggregationRepository.get(context, key);
+ // Then
+ assertNull(exchange2);
+ }
+
+ @Test
+ public void testRemoveExists() {
+ // Given
+ String key = "Remove_Exists";
+ Exchange exchange = new DefaultExchange(context);
+ aggregationRepository.add(context, key, exchange);
+ assertTrue(exists(key));
+ // When
+ aggregationRepository.remove(context, key, exchange);
+ // Then
+ assertFalse(exists(key));
+ }
+
+ @Test
+ public void testRemoveNotExists() {
+ // Given
+ String key = "RemoveNotExists";
+ Exchange exchange = new DefaultExchange(context);
+ assertFalse(exists(key));
+ // When
+ aggregationRepository.remove(context, key, exchange);
+ // Then
+ assertFalse(exists(key));
+ }
+
+ @Test
+ public void testGetKeys() {
+ // Given
+ String[] keys = { "GetKeys1", "GetKeys2" };
+ addExchanges(keys);
+ // When
+ Set<String> keySet = aggregationRepository.getKeys();
+ // Then
+ for (String key : keys) {
+ assertTrue(keySet.contains(key));
+ }
+ }
+
+ @Test
+ public void testConfirmExist() {
+ // Given
+ for (int i = 1; i < 4; i++) {
+ String key = "Confirm_" + i;
+ Exchange exchange = new DefaultExchange(context);
+ exchange.setExchangeId("Exchange_" + i);
+ aggregationRepository.add(context, key, exchange);
+ assertTrue(exists(key));
+ }
+ // When
+ aggregationRepository.confirm(context, "Exchange_2");
+ // Then
+ assertTrue(exists("Confirm_1"));
+ assertFalse(exists("Confirm_2"));
+ assertTrue(exists("Confirm_3"));
+ }
+
+ @Test
+ public void testConfirmNotExist() {
+ // Given
+ String[] keys = new String[3];
+ for (int i = 1; i < 4; i++) {
+ keys[i - 1] = "Confirm" + i;
+ }
+ addExchanges(keys);
+ for (String key : keys) {
+ assertTrue(exists(key));
+ }
+ // When
+ aggregationRepository.confirm(context, "Exchange-Confirm5");
+ // Then
+ for (String key : keys) {
+ assertTrue(exists(key));
+ }
+ }
+
+ private void addExchanges(String... keys) {
+ for (String key : keys) {
+ Exchange exchange = new DefaultExchange(context);
+ exchange.setExchangeId("Exchange-" + key);
+ aggregationRepository.add(context, key, exchange);
+ }
+ }
+
+ @Test
+ public void testScan() {
+ // Given
+ String[] keys = { "Scan1", "Scan2" };
+ addExchanges(keys);
+ // When
+ Set<String> exchangeIdSet = aggregationRepository.scan(context);
+ // Then
+ for (String key : keys) {
+ assertTrue(exchangeIdSet.contains("Exchange-" + key));
+ }
+ }
+
+ @Test
+ public void testRecover() {
+ // Given
+ String[] keys = { "Recover1", "Recover2" };
+ addExchanges(keys);
+ // When
+ Exchange exchange2 = aggregationRepository.recover(context, "Exchange-Recover2");
+ Exchange exchange3 = aggregationRepository.recover(context, "Exchange-Recover3");
+ // Then
+ assertNotNull(exchange2);
+ assertNull(exchange3);
+ }
+
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java
new file mode 100644
index 0000000..6010643
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.util;
+
+import java.io.Serializable;
+
+public class HeaderDto implements Cloneable, Serializable {
+
+ private static final long serialVersionUID = -5004840651888298047L;
+ private String org;
+ private String type;
+ private int key;
+
+ public HeaderDto(String org, String type, int key) {
+ this.org = org;
+ this.type = type;
+ this.key = key;
+ }
+
+ public int getKey() {
+ return key;
+ }
+
+ public void setKey(int key) {
+ this.key = key;
+ }
+
+ public String getOrg() {
+ return org;
+ }
+
+ public void setOrg(String org) {
+ this.org = org;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ return "HeaderDto [org=" + org + ", type=" + type + ", key=" + key + "]";
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java
new file mode 100644
index 0000000..2f54a58
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraIdempotentIT.class,
+ CassandraIdempotentIT.TestConfiguration.class
+ }
+)
+public class CassandraIdempotentIT extends BaseCassandra {
+
+ CassandraIdempotentRepository idempotentRepository;
+
+
+ @EndpointInject("mock:output")
+ MockEndpoint mockOutput;
+
+
+
+
+ private void send(String idempotentId, String body) {
+ super.template.sendBodyAndHeader("direct:input", body, "idempotentId", idempotentId);
+ }
+
+ @Test
+ public void testIdempotentRoute() throws Exception {
+ // Given
+
+ mockOutput.expectedMessageCount(2);
+ mockOutput.expectedBodiesReceivedInAnyOrder("A", "B");
+ // When
+ send("1", "A");
+ send("2", "B");
+ send("1", "A");
+ send("2", "B");
+ send("1", "A");
+ // Then
+ mockOutput.assertIsSatisfied();
+
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ idempotentRepository = new NamedCassandraIdempotentRepository(getSession(), "ID");
+ idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT");
+ idempotentRepository.start();
+ from("direct:input").idempotentConsumer(header("idempotentId"), idempotentRepository).to("mock:output");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java
new file mode 100644
index 0000000..3313cc8
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ CassandraIdempotentRepositoryIT.class
+ }
+)
+public class CassandraIdempotentRepositoryIT extends BaseCassandra {
+
+ CassandraIdempotentRepository idempotentRepository;
+
+
+
+
+
+ @BeforeEach
+ protected void doPreSetup() throws Exception {
+ idempotentRepository = new CassandraIdempotentRepository(getSession());
+ idempotentRepository.start();
+
+ }
+
+ @Override
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ executeScript("IdempotentDataSet.cql");
+ }
+
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ idempotentRepository.stop();
+ }
+
+ private boolean exists(String key) {
+ return getSession().execute(String.format("select KEY from CAMEL_IDEMPOTENT where KEY='%s'", key)).one() != null;
+ }
+
+ @Test
+ public void testAddNotExists() {
+ // Given
+ String key = "Add_NotExists";
+ assertFalse(exists(key));
+ // When
+ boolean result = idempotentRepository.add(key);
+ // Then
+ assertTrue(result);
+ assertTrue(exists(key));
+ }
+
+ @Test
+ public void testAddExists() {
+ // Given
+ String key = "Add_Exists";
+ assertTrue(exists(key));
+ // When
+ boolean result = idempotentRepository.add(key);
+ // Then
+ assertFalse(result);
+ assertTrue(exists(key));
+ }
+
+ @Test
+ public void testContainsNotExists() {
+ // Given
+ String key = "Contains_NotExists";
+ assertFalse(exists(key));
+ // When
+ boolean result = idempotentRepository.contains(key);
+ // Then
+ assertFalse(result);
+ }
+
+ @Test
+ public void testContainsExists() {
+ // Given
+ String key = "Contains_Exists";
+ assertTrue(exists(key));
+ // When
+ boolean result = idempotentRepository.contains(key);
+ // Then
+ assertTrue(result);
+ }
+
+ @Test
+ public void testRemoveNotExists() {
+ // Given
+ String key = "Remove_NotExists";
+ assertFalse(exists(key));
+ // When
+ boolean result = idempotentRepository.contains(key);
+ // Then
+ assertFalse(result);
+ }
+
+ @Test
+ public void testRemoveExists() {
+ // Given
+ String key = "Remove_Exists";
+ assertTrue(exists(key));
+ // When
+ boolean result = idempotentRepository.remove(key);
+ // Then
+ assertTrue(result);
+ }
+
+ @Test
+ public void testClear() {
+ // Given
+ String key = "Remove_Exists";
+ assertTrue(exists(key));
+ // When
+ idempotentRepository.clear();
+ // Then
+ assertFalse(idempotentRepository.contains(key));
+ }
+
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java
new file mode 100644
index 0000000..38a224b
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ NamedCassandraIdempotentRepositoryIT.class
+ }
+)
+public class NamedCassandraIdempotentRepositoryIT extends BaseCassandra {
+
+ CassandraIdempotentRepository idempotentRepository;
+
+
+
+
+
+ @BeforeEach
+ protected void doPreSetup() throws Exception {
+ idempotentRepository = new NamedCassandraIdempotentRepository(getSession(), "ID");
+ idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT");
+ idempotentRepository.start();
+
+ }
+
+ @Override
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ executeScript("NamedIdempotentDataSet.cql");
+ }
+
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ idempotentRepository.stop();
+ }
+
+ private boolean exists(String key) {
+ return getSession().execute(String.format("select KEY from NAMED_CAMEL_IDEMPOTENT where NAME='ID' and KEY='%s'", key))
+ .one()
+ != null;
+ }
+
+ @Test
+ public void testAddNotExists() {
+ // Given
+ String key = "Add_NotExists";
+ assertFalse(exists(key));
+ // When
+ boolean result = idempotentRepository.add(key);
+ // Then
+ assertTrue(result);
+ assertTrue(exists(key));
+ }
+
+ @Test
+ public void testAddExists() {
+ // Given
+ String key = "Add_Exists";
+ assertTrue(exists(key));
+ // When
+ boolean result = idempotentRepository.add(key);
+ // Then
+ assertFalse(result);
+ assertTrue(exists(key));
+ }
+
+ @Test
+ public void testContainsNotExists() {
+ // Given
+ String key = "Contains_NotExists";
+ assertFalse(exists(key));
+ // When
+ boolean result = idempotentRepository.contains(key);
+ // Then
+ assertFalse(result);
+ }
+
+ @Test
+ public void testContainsExists() {
+ // Given
+ String key = "Contains_Exists";
+ assertTrue(exists(key));
+ // When
+ boolean result = idempotentRepository.contains(key);
+ // Then
+ assertTrue(result);
+ }
+
+ @Test
+ public void testRemoveNotExists() {
+ // Given
+ String key = "Remove_NotExists";
+ assertFalse(exists(key));
+ // When
+ boolean result = idempotentRepository.contains(key);
+ // Then
+ assertFalse(result);
+ }
+
+ @Test
+ public void testRemoveExists() {
+ // Given
+ String key = "Remove_Exists";
+ assertTrue(exists(key));
+ // When
+ boolean result = idempotentRepository.remove(key);
+ // Then
+ assertTrue(result);
+ }
+
+ @Test
+ public void testClear() {
+ // Given
+ String key = "Remove_Exists";
+ assertTrue(exists(key));
+ // When
+ idempotentRepository.clear();
+ // Then
+ assertFalse(idempotentRepository.contains(key));
+ }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql
new file mode 100644
index 0000000..9054e40
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql
@@ -0,0 +1,6 @@
+TRUNCATE camel_user;
+
+INSERT INTO camel_user(login, first_name, last_name)
+ VALUES('j_strachan','James','Strachan');
+INSERT INTO camel_user(login, first_name, last_name)
+ VALUES('c_ibsen','Claus','Ibsen');
\ No newline at end of file
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql
new file mode 100644
index 0000000..2d4b974
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql
@@ -0,0 +1,8 @@
+TRUNCATE CAMEL_IDEMPOTENT;
+
+INSERT INTO CAMEL_IDEMPOTENT(KEY)
+ VALUES('Add_Exists');
+INSERT INTO camel_ks.CAMEL_IDEMPOTENT(KEY)
+ VALUES('Contains_Exists');
+INSERT INTO camel_ks.CAMEL_IDEMPOTENT(KEY)
+ VALUES('Remove_Exists');
\ No newline at end of file
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql
new file mode 100644
index 0000000..a8adebd
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql
@@ -0,0 +1,8 @@
+TRUNCATE NAMED_CAMEL_IDEMPOTENT;
+
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+ VALUES('ID','Add_Exists');
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+ VALUES('ID','Contains_Exists');
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+ VALUES('ID','Remove_Exists');
\ No newline at end of file
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql b/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql
new file mode 100644
index 0000000..0253f9b
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql
@@ -0,0 +1,33 @@
+CREATE KEYSPACE IF NOT EXISTS camel_ks WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
+
+CREATE TABLE camel_ks.camel_user (
+ login varchar PRIMARY KEY,
+ first_name varchar,
+ last_name varchar
+);
+
+CREATE TABLE camel_ks.NAMED_CAMEL_AGGREGATION (
+ NAME varchar,
+ KEY varchar,
+ EXCHANGE_ID varchar,
+ EXCHANGE blob,
+ PRIMARY KEY (NAME, KEY)
+);
+
+CREATE TABLE camel_ks.NAMED_CAMEL_IDEMPOTENT (
+ NAME varchar,
+ KEY varchar,
+ PRIMARY KEY (NAME, KEY)
+);
+
+CREATE TABLE camel_ks.CAMEL_IDEMPOTENT (
+ KEY varchar,
+ PRIMARY KEY (KEY)
+);
+
+CREATE TABLE camel_ks.CAMEL_AGGREGATION (
+ KEY varchar,
+ EXCHANGE_ID varchar,
+ EXCHANGE blob,
+ PRIMARY KEY (KEY)
+);