You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ff...@apache.org on 2022/03/08 22:43:35 UTC

[camel-spring-boot] branch main updated: [CAMEL-17758]add tests in camel-cassandraql-starter (#458)

This is an automated email from the ASF dual-hosted git repository.

ffang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git


The following commit(s) were added to refs/heads/main by this push:
     new f43a4b4  [CAMEL-17758]add tests in camel-cassandraql-starter (#458)
f43a4b4 is described below

commit f43a4b40a226ba3a2a28b8716230669c5e5e150a
Author: Freeman(Yue) Fang <fr...@gmail.com>
AuthorDate: Tue Mar 8 17:38:51 2022 -0500

    [CAMEL-17758]add tests in camel-cassandraql-starter (#458)
    
    (cherry picked from commit c7335a6467079c6bc1901037681d351597834752)
---
 .../camel-cassandraql-starter/pom.xml              |  70 ++++++
 .../cassandra/MockLoadBalancingPolicy.java         |  43 ++++
 .../integration/CassandraComponentBeanRefIT.java   |  92 ++++++++
 .../integration/CassandraComponentConsumerIT.java  | 134 ++++++++++++
 .../integration/CassandraComponentProducerIT.java  | 234 +++++++++++++++++++++
 .../CassandraComponentProducerUnpreparedIT.java    | 156 ++++++++++++++
 .../cassandra/springboot/BaseCassandra.java        | 113 ++++++++++
 .../cassandra/CassandraAggregationIT.java          | 123 +++++++++++
 .../CassandraAggregationRepositoryIT.java          | 222 +++++++++++++++++++
 .../CassandraAggregationSerializedHeadersIT.java   | 128 +++++++++++
 .../NamedCassandraAggregationRepositoryIT.java     | 227 ++++++++++++++++++++
 .../camel/processor/aggregate/util/HeaderDto.java  |  62 ++++++
 .../cassandra/CassandraIdempotentIT.java           |  96 +++++++++
 .../cassandra/CassandraIdempotentRepositoryIT.java | 153 ++++++++++++++
 .../NamedCassandraIdempotentRepositoryIT.java      | 155 ++++++++++++++
 .../src/test/resources/BasicDataSet.cql            |   6 +
 .../src/test/resources/IdempotentDataSet.cql       |   8 +
 .../src/test/resources/NamedIdempotentDataSet.cql  |   8 +
 .../src/test/resources/initScript.cql              |  33 +++
 19 files changed, 2063 insertions(+)

diff --git a/components-starter/camel-cassandraql-starter/pom.xml b/components-starter/camel-cassandraql-starter/pom.xml
index 347a961..36a0f63 100644
--- a/components-starter/camel-cassandraql-starter/pom.xml
+++ b/components-starter/camel-cassandraql-starter/pom.xml
@@ -39,6 +39,14 @@
       <artifactId>camel-cassandraql</artifactId>
       <version>${camel-version}</version>
     </dependency>
+    <!-- test infra -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-infra-cassandra</artifactId>
+      <version>${camel-version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <!--START OF GENERATED CODE-->
     <dependency>
       <groupId>com.datastax.oss</groupId>
@@ -56,4 +64,66 @@
     </dependency>
     <!--END OF GENERATED CODE-->
   </dependencies>
+  <profiles>
+    <!-- activate integration test if the docker socket file is accessible -->
+    <profile>
+      <id>cassandraql-integration-tests-docker-file</id>
+      <activation>
+        <file>
+          <exists>/var/run/docker.sock</exists>
+        </file>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <configuration>
+              <systemPropertyVariables>
+                <visibleassertions.silence>true</visibleassertions.silence>
+              </systemPropertyVariables>
+            </configuration>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <!-- activate integration test if the DOCKER_HOST env var is set -->
+    <profile>
+      <id>cassandraql-integration-tests-docker-env</id>
+      <activation>
+        <property>
+          <name>env.DOCKER_HOST</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <configuration>
+              <systemPropertyVariables>
+                <visibleassertions.silence>true</visibleassertions.silence>
+              </systemPropertyVariables>
+            </configuration>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java
new file mode 100644
index 0000000..8a68c07
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/MockLoadBalancingPolicy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra;
+
+import java.util.Queue;
+
+import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.session.Request;
+import com.datastax.oss.driver.api.core.session.Session;
+import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+public class MockLoadBalancingPolicy extends DefaultLoadBalancingPolicy {
+
+    public static boolean used;
+
+    public MockLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
+        super(context, profileName);
+    }
+
+    @NonNull
+    @Override
+    public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
+        MockLoadBalancingPolicy.used = true;
+        return super.newQueryPlan(request, session);
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java
new file mode 100644
index 0000000..7237352
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentBeanRefIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+
+
+import com.datastax.oss.driver.api.core.CqlSession;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraEndpoint;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraComponentBeanRefIT.class,
+        CassandraComponentBeanRefIT.TestConfiguration.class
+    }
+)
+public class CassandraComponentBeanRefIT extends BaseCassandra {
+
+    public static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
+    public static final String SESSION_URI = "cql:bean:cassandraSession?cql=" + CQL;
+
+    @Bean("cassandraSession")
+    protected CqlSession createSession() {
+              
+        return getSession();
+    }
+    
+    
+   
+
+    @Test
+    public void testSession() {
+        
+        
+        CassandraEndpoint endpoint = context.getEndpoint(SESSION_URI, CassandraEndpoint.class);
+        assertNotNull(endpoint, "No endpoint found for uri: " + SESSION_URI);
+
+        assertEquals(KEYSPACE_NAME, endpoint.getKeyspace());
+        assertEquals(CQL, endpoint.getCql());
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:inputSession").to(SESSION_URI);
+                }
+            };
+        }
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java
new file mode 100644
index 0000000..676da47
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentConsumerIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.oss.driver.api.core.cql.Row;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraComponentConsumerIT.class,
+        CassandraComponentConsumerIT.TestConfiguration.class
+    }
+)
+public class CassandraComponentConsumerIT extends BaseCassandra {
+
+    static final String CQL = "select login, first_name, last_name from camel_user";
+    
+    @EndpointInject("mock:resultAll")
+    MockEndpoint mock;
+    
+    @EndpointInject("mock:resultUnprepared")
+    MockEndpoint mockResulutUnprepared;
+
+    @EndpointInject("mock:resultOne")
+    MockEndpoint mockResulutOne;
+   
+
+    @Test
+    public void testConsumeAll() throws Exception {
+        
+        mock.expectedMinimumMessageCount(1);
+        mock.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                Object body = exchange.getIn().getBody();
+                assertTrue(body instanceof List);
+            }
+        });
+        mock.await(1, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testConsumeUnprepared() throws Exception {
+        
+        mockResulutUnprepared.expectedMinimumMessageCount(1);
+        mockResulutUnprepared.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                Object body = exchange.getIn().getBody();
+                assertTrue(body instanceof List);
+            }
+        });
+        mockResulutUnprepared.await(1, TimeUnit.SECONDS);
+        mockResulutUnprepared.assertIsSatisfied();
+    }
+
+    @Test
+    public void testConsumeOne() throws Exception {
+        
+        mockResulutOne.expectedMinimumMessageCount(1);
+        mockResulutOne.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                Object body = exchange.getIn().getBody();
+                assertTrue(body instanceof Row);
+            }
+        });
+        mock.await(1, TimeUnit.SECONDS);
+
+        mockResulutOne.assertIsSatisfied();
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)).to("mock:resultAll");
+                    from(String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, CQL))
+                            .to("mock:resultUnprepared");
+                    from(String.format("cql://%s/%s?cql=%s&resultSetConversionStrategy=ONE", getUrl(), KEYSPACE_NAME, CQL))
+                            .to("mock:resultOne");
+                }
+            };
+        }
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java
new file mode 100644
index 0000000..f17e502
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerIT.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.update.Update;
+
+
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraConstants;
+import org.apache.camel.component.cassandra.CassandraEndpoint;
+import org.apache.camel.component.cassandra.MockLoadBalancingPolicy;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraComponentProducerIT.class,
+        CassandraComponentProducerIT.TestConfiguration.class
+    }
+)
+public class CassandraComponentProducerIT extends BaseCassandra {
+
+    static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
+    static final String NO_PARAMETER_CQL = "select login, first_name, last_name from camel_user";
+
+    @Produce("direct:input")
+    ProducerTemplate producerTemplate;
+
+    @Produce("direct:inputNoParameter")
+    ProducerTemplate noParameterProducerTemplate;
+
+    @Produce("direct:inputNotConsistent")
+    ProducerTemplate notConsistentProducerTemplate;
+
+    @Produce("direct:loadBalancingPolicy")
+    ProducerTemplate loadBalancingPolicyTemplate;
+
+    @Produce("direct:inputNoEndpointCql")
+    ProducerTemplate producerTemplateNoEndpointCql;
+   
+
+    @Test
+    public void testRequestUriCql() {
+        producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang"));
+
+        ResultSet resultSet = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "w_jiang"));
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Willem", row.getString("first_name"));
+        assertEquals("Jiang", row.getString("last_name"));
+    }
+
+    @Test
+    public void testRequestNoParameterNull() {
+        Object response = noParameterProducerTemplate.requestBody(null);
+
+        assertNotNull(response);
+        assertIsInstanceOf(List.class, response);
+    }
+
+    @Test
+    public void testRequestNoParameterEmpty() {
+        Object response = noParameterProducerTemplate.requestBody(Collections.emptyList());
+
+        assertNotNull(response);
+        assertIsInstanceOf(List.class, response);
+    }
+
+    @Test
+    public void testRequestMessageCql() {
+        producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+                "update camel_user set first_name=?, last_name=? where login=?");
+
+        ResultSet resultSet = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+    }
+
+    @Test
+    public void testLoadBalancing() {
+        loadBalancingPolicyTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" },
+                CassandraConstants.CQL_QUERY,
+                "update camel_user set first_name=?, last_name=? where login=?");
+
+        ResultSet resultSet = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+
+        Assertions.assertTrue(MockLoadBalancingPolicy.used);
+    }
+
+    /**
+     * Test with incoming message containing a header with RegularStatement.
+     */
+    @Test
+    public void testRequestMessageStatement() {
+
+        Update update = QueryBuilder.update("camel_user")
+                .setColumn("first_name", bindMarker())
+                .setColumn("last_name", bindMarker())
+                .whereColumn("login").isEqualTo(bindMarker());
+        producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+                update.build());
+
+        ResultSet resultSet = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+    }
+
+    /**
+     * Simulate different CQL statements in the incoming message containing a header with RegularStatement, justifying
+     * the cassandracql endpoint not containing a "cql" Uri parameter
+     */
+    @Test
+    public void testEndpointNoCqlParameter() {
+        Update update = QueryBuilder.update("camel_user")
+                .setColumn("first_name", bindMarker())
+                .whereColumn("login").isEqualTo(bindMarker());
+        producerTemplateNoEndpointCql.sendBodyAndHeader(new Object[] { "Claus 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+                update.build());
+
+        ResultSet resultSet1 = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+        Row row1 = resultSet1.one();
+        assertNotNull(row1);
+        assertEquals("Claus 2", row1.getString("first_name"));
+        assertEquals("Ibsen", row1.getString("last_name"));
+
+        update = QueryBuilder.update("camel_user")
+                .setColumn("last_name", bindMarker())
+                .whereColumn("login").isEqualTo(bindMarker());
+        producerTemplateNoEndpointCql.sendBodyAndHeader(new Object[] { "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+                update.build());
+
+        ResultSet resultSet2 = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+        Row row2 = resultSet2.one();
+        assertNotNull(row2);
+        assertEquals("Claus 2", row2.getString("first_name"));
+        assertEquals("Ibsen 2", row2.getString("last_name"));
+    }
+
+    @Test
+    public void testRequestNotConsistent() {
+        CassandraEndpoint endpoint
+                = context.getEndpoint(String.format("cql://%s/%s?cql=%s&consistencyLevel=ANY", getUrl(), KEYSPACE_NAME, CQL),
+                        CassandraEndpoint.class);
+        assertEquals(ConsistencyLevel.ANY, endpoint.getConsistencyLevel());
+
+        notConsistentProducerTemplate.requestBody(Arrays.asList("j_anstey", "Jonathan", "Anstey"));
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+
+                    from("direct:input").to(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL));
+                    from("direct:inputNoParameter")
+                            .to(String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL));
+                    from("direct:loadBalancingPolicy").to(String.format(
+                            "cql://%s/%s?cql=%s&loadBalancingPolicyClass=org.apache.camel.component.cassandra.MockLoadBalancingPolicy",
+                            getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL));
+                    from("direct:inputNotConsistent")
+                            .to(String.format("cql://%s/%s?cql=%s&consistencyLevel=ANY", getUrl(), KEYSPACE_NAME, CQL));
+                    from("direct:inputNoEndpointCql").to(String.format("cql://%s/%s", getUrl(), KEYSPACE_NAME));
+                }
+            };
+        }
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java
new file mode 100644
index 0000000..c8bc4f1
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentProducerUnpreparedIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.update.Update;
+
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraConstants;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraComponentProducerUnpreparedIT.class,
+        CassandraComponentProducerUnpreparedIT.TestConfiguration.class
+    }
+)
+public class CassandraComponentProducerUnpreparedIT extends BaseCassandra {
+
+    static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
+    static final String NO_PARAMETER_CQL = "select login, first_name, last_name from camel_user";
+
+    
+    
+    
+    @Produce("direct:input")
+    ProducerTemplate producerTemplate;
+
+    @Produce("direct:inputNoParameter")
+    ProducerTemplate noParameterProducerTemplate;
+   
+
+    @Test
+    public void testRequestUriCql() {
+        producerTemplate.requestBody(Arrays.asList("w_jiang", "Willem", "Jiang"));
+
+        ResultSet resultSet = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "w_jiang"));
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Willem", row.getString("first_name"));
+        assertEquals("Jiang", row.getString("last_name"));
+    }
+
+    @Test
+    public void testRequestNoParameterNull() {
+        Object response = noParameterProducerTemplate.requestBody(null);
+
+        assertNotNull(response);
+        assertIsInstanceOf(List.class, response);
+    }
+
+    @Test
+    public void testRequestNoParameterEmpty() {
+        Object response = noParameterProducerTemplate.requestBody(null);
+
+        assertNotNull(response);
+        assertIsInstanceOf(List.class, response);
+    }
+
+    @Test
+    public void testRequestMessageCql() {
+        producerTemplate.requestBodyAndHeader(new Object[] { "Claus 2", "Ibsen 2", "c_ibsen" }, CassandraConstants.CQL_QUERY,
+                "update camel_user set first_name=?, last_name=? where login=?");
+
+        ResultSet resultSet = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+    }
+
+    /**
+     * Test with incoming message containing a header with RegularStatement.
+     */
+    @Test
+    public void testRequestMessageStatement() {
+        Update update = QueryBuilder.update("camel_user")
+                .setColumn("first_name", literal("Claus 2"))
+                .setColumn("last_name", literal("Ibsen 2"))
+                .whereColumn("login").isEqualTo(literal("c_ibsen"));
+        producerTemplate.requestBodyAndHeader(null, CassandraConstants.CQL_QUERY, update.build());
+
+        ResultSet resultSet = getSession()
+                .execute(String.format("select login, first_name, last_name from camel_user where login = '%s'", "c_ibsen"));
+        Row row = resultSet.one();
+        assertNotNull(row);
+        assertEquals("Claus 2", row.getString("first_name"));
+        assertEquals("Ibsen 2", row.getString("last_name"));
+    }
+
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+
+                    from("direct:input")
+                            .to(String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, CQL));
+                    from("direct:inputNoParameter").to(
+                            String.format("cql://%s/%s?cql=%s&prepareStatements=false", getUrl(), KEYSPACE_NAME, NO_PARAMETER_CQL));
+                }
+            };
+        }
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java
new file mode 100644
index 0000000..dd26263
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/component/cassandra/springboot/BaseCassandra.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.springboot;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.test.infra.cassandra.services.CassandraLocalContainerService;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+public class BaseCassandra {
+    
+    @Autowired
+    protected CamelContext context;
+    
+    @Autowired
+    protected ProducerTemplate template;
+
+    @RegisterExtension
+    public static CassandraLocalContainerService service;
+
+    public static final String KEYSPACE_NAME = "camel_ks";
+    public static final String DATACENTER_NAME = "datacenter1";
+
+    private CqlSession session;
+
+    static {
+        service = new CassandraLocalContainerService();
+
+        service.getContainer()
+                .withInitScript("initScript.cql")
+                .withNetworkAliases("cassandra");
+        
+    }
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        executeScript("BasicDataSet.cql");      
+        
+    }
+
+    public void executeScript(String pathToScript) throws IOException {
+        String s = IOUtils.toString(getClass().getResourceAsStream("/" + pathToScript), "UTF-8");
+        String[] statements = s.split(";");
+        for (int i = 0; i < statements.length; i++) {
+            if (!statements[i].isEmpty()) {
+                executeCql(statements[i]);
+            }
+        }
+    }
+
+    public void executeCql(String cql) {
+        getSession().execute(cql);
+    }
+
+    @AfterEach
+    protected void doPostTearDown() throws Exception {
+        
+        try {
+            if (session != null) {
+                session.close();
+                session = null;
+            }
+        } catch (Exception e) {
+            // ignored
+        }
+    }
+
+    public CqlSession getSession() {
+        if (session == null) {
+            InetSocketAddress endpoint
+                    = new InetSocketAddress(service.getCassandraHost(), service.getCQL3Port());
+            //create a new session
+            session = CqlSession.builder()
+                    .withLocalDatacenter(DATACENTER_NAME)
+                    .withKeyspace(KEYSPACE_NAME)
+                    .withConfigLoader(DriverConfigLoader.programmaticBuilder()
+                            .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(5)).build())
+                    .addContactPoint(endpoint).build();
+        }
+        return session;
+    }
+
+    public String getUrl() {
+        return service.getCQL3Endpoint();
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java
new file mode 100644
index 0000000..07fbcca
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraAggregationIT.class,
+        CassandraAggregationIT.TestConfiguration.class
+    }
+)
+public class CassandraAggregationIT extends BaseCassandra {
+
+    CassandraAggregationRepository aggregationRepository;
+
+    
+    @EndpointInject("mock:output")
+    MockEndpoint mockOutput;
+
+    @BeforeEach
+    protected void doPreSetup() throws Exception {
+        aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID");
+        aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
+        aggregationRepository.start();
+
+    }
+
+    
+    @AfterEach
+    public void tearDown() throws Exception {
+    
+        aggregationRepository.stop();
+    }
+
+    private void send(String aggregationId, String body) {
+        super.template.sendBodyAndHeader("direct:input", body, "aggregationId", aggregationId);
+    }
+
+    @Test
+    public void testAggregationRoute() throws Exception {
+        // Given
+        
+        mockOutput.expectedMessageCount(2);
+        mockOutput.expectedBodiesReceivedInAnyOrder("A,C,E", "B,D");
+        // When
+        send("1", "A");
+        send("2", "B");
+        send("1", "C");
+        send("2", "D");
+        send("1", "E");
+        // Then
+        mockOutput.assertIsSatisfied(4000L);
+    }
+    
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    AggregationStrategy aggregationStrategy = new AggregationStrategy() {
+                        @Override
+                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                            if (oldExchange == null) {
+                                return newExchange;
+                            }
+                            String oldBody = oldExchange.getIn().getBody(String.class);
+                            String newBody = newExchange.getIn().getBody(String.class);
+                            oldExchange.getIn().setBody(oldBody + "," + newBody);
+                            return oldExchange;
+                        }
+                    };
+                    from("direct:input").aggregate(header("aggregationId"), aggregationStrategy).completionSize(3)
+                            .completionTimeout(3000L).aggregationRepository(aggregationRepository)
+                            .to("mock:output");
+                }
+            };
+        }
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java
new file mode 100644
index 0000000..ba3e3ff
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepositoryIT.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import java.util.Set;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.support.DefaultExchange;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraAggregationRepositoryIT.class
+    }
+)
+public class CassandraAggregationRepositoryIT extends BaseCassandra {
+
+    CassandraAggregationRepository aggregationRepository;
+
+    
+    @EndpointInject("mock:output")
+    MockEndpoint mockOutput;
+
+    @BeforeEach
+    protected void doPreSetup() throws Exception {
+        aggregationRepository = new CassandraAggregationRepository(getSession());
+        aggregationRepository.start();
+        
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        aggregationRepository.stop();
+    }
+
+    private boolean exists(String key) {
+        return getSession().execute(String.format("select KEY from CAMEL_AGGREGATION where KEY='%s'", key)).one() != null;
+    }
+
+    @Test
+    public void testAdd() {
+        // Given
+        String key = "Add";
+        assertFalse(exists(key));
+        Exchange exchange = new DefaultExchange(context);
+        // When
+        aggregationRepository.add(context, key, exchange);
+        // Then
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testGetExists() {
+        // Given
+        String key = "Get_Exists";
+        Exchange exchange = new DefaultExchange(context);
+        aggregationRepository.add(context, key, exchange);
+        assertTrue(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(context, key);
+        // Then
+        assertNotNull(exchange2);
+        assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+    }
+
+    @Test
+    public void testGetNotExists() {
+        // Given
+        String key = "Get_NotExists";
+        assertFalse(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(context, key);
+        // Then
+        assertNull(exchange2);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        Exchange exchange = new DefaultExchange(context);
+        aggregationRepository.add(context, key, exchange);
+        assertTrue(exists(key));
+        // When
+        aggregationRepository.remove(context, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "RemoveNotExists";
+        Exchange exchange = new DefaultExchange(context);
+        assertFalse(exists(key));
+        // When
+        aggregationRepository.remove(context, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testGetKeys() {
+        // Given
+        String[] keys = { "GetKeys1", "GetKeys2" };
+        addExchanges(keys);
+        // When
+        Set<String> keySet = aggregationRepository.getKeys();
+        // Then
+        for (String key : keys) {
+            assertTrue(keySet.contains(key));
+        }
+    }
+
+    @Test
+    public void testConfirmExist() {
+        // Given
+        for (int i = 1; i < 4; i++) {
+            String key = "Confirm_" + i;
+            Exchange exchange = new DefaultExchange(context);
+            exchange.setExchangeId("Exchange_" + i);
+            aggregationRepository.add(context, key, exchange);
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(context, "Exchange_2");
+        // Then
+        assertTrue(exists("Confirm_1"));
+        assertFalse(exists("Confirm_2"));
+        assertTrue(exists("Confirm_3"));
+    }
+
+    @Test
+    public void testConfirmNotExist() {
+        // Given
+        String[] keys = new String[3];
+        for (int i = 1; i < 4; i++) {
+            keys[i - 1] = "Confirm" + i;
+        }
+        addExchanges(keys);
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(context, "Exchange-Confirm5");
+        // Then
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+    }
+
+    private void addExchanges(String... keys) {
+        for (String key : keys) {
+            Exchange exchange = new DefaultExchange(context);
+            exchange.setExchangeId("Exchange-" + key);
+            aggregationRepository.add(context, key, exchange);
+        }
+    }
+
+    @Test
+    public void testScan() {
+        // Given
+        String[] keys = { "Scan1", "Scan2" };
+        addExchanges(keys);
+        // When
+        Set<String> exchangeIdSet = aggregationRepository.scan(context);
+        // Then
+        for (String key : keys) {
+            assertTrue(exchangeIdSet.contains("Exchange-" + key));
+        }
+    }
+
+    @Test
+    public void testRecover() {
+        // Given
+        String[] keys = { "Recover1", "Recover2" };
+        addExchanges(keys);
+        // When
+        Exchange exchange2 = aggregationRepository.recover(context, "Exchange-Recover2");
+        Exchange exchange3 = aggregationRepository.recover(context, "Exchange-Recover3");
+        // Then
+        assertNotNull(exchange2);
+        assertNull(exchange3);
+    }
+
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java
new file mode 100644
index 0000000..3d6c7cc
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationSerializedHeadersIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.util.HeaderDto;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraAggregationSerializedHeadersIT.class,
+        CassandraAggregationSerializedHeadersIT.TestConfiguration.class
+    }
+)
+public class CassandraAggregationSerializedHeadersIT extends BaseCassandra {
+
+    CassandraAggregationRepository aggregationRepository;
+
+    
+    @EndpointInject("mock:output")
+    MockEndpoint mockOutput;
+
+    @BeforeEach
+    protected void doPreSetup() throws Exception {
+        aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID");
+        aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
+        aggregationRepository.setAllowSerializedHeaders(true);
+        aggregationRepository.start();
+
+    }
+
+    
+    @AfterEach
+    public void tearDown() throws Exception {
+    
+        aggregationRepository.stop();
+    }
+
+    private void send(HeaderDto aggregationId, String body) {
+        template.sendBodyAndHeader("direct:input", body, "aggregationId", aggregationId);
+    }
+
+    @Test
+    public void testAggregationRoute() throws Exception {
+        // Given
+        
+        mockOutput.expectedMessageCount(2);
+        mockOutput.expectedBodiesReceivedInAnyOrder("A,C,E", "B,D");
+        HeaderDto dto1 = new HeaderDto("org", "company", 1);
+        HeaderDto dto2 = new HeaderDto("org", "company", 2);
+        // When
+        send(dto1, "A");
+        send(dto2, "B");
+        send(dto1, "C");
+        send(dto2, "D");
+        send(dto1, "E");
+        // Then
+        mockOutput.assertIsSatisfied(4000L);
+
+    }
+    
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    AggregationStrategy aggregationStrategy = new AggregationStrategy() {
+                        @Override
+                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                            if (oldExchange == null) {
+                                return newExchange;
+                            }
+                            String oldBody = oldExchange.getIn().getBody(String.class);
+                            String newBody = newExchange.getIn().getBody(String.class);
+                            oldExchange.getIn().setBody(oldBody + "," + newBody);
+                            return oldExchange;
+                        }
+                    };
+                    from("direct:input").aggregate(header("aggregationId"), aggregationStrategy).completionSize(3)
+                            .completionTimeout(3000L).aggregationRepository(aggregationRepository)
+                            .to("mock:output");
+                }
+            };
+        }
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java
new file mode 100644
index 0000000..d1afc1c
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepositoryIT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+
+import java.util.Set;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.support.DefaultExchange;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        NamedCassandraAggregationRepositoryIT.class
+    }
+)
+public class NamedCassandraAggregationRepositoryIT extends BaseCassandra {
+
+    CassandraAggregationRepository aggregationRepository;
+
+    
+    @EndpointInject("mock:output")
+    MockEndpoint mockOutput;
+
+    @BeforeEach
+    protected void doPreSetup() throws Exception {
+        aggregationRepository = new NamedCassandraAggregationRepository(getSession(), "ID");
+        aggregationRepository.setTable("NAMED_CAMEL_AGGREGATION");
+        aggregationRepository.start();
+
+    }
+
+    
+    @AfterEach
+    public void tearDown() throws Exception {
+    
+        aggregationRepository.stop();
+    }
+
+    private boolean exists(String key) {
+        return getSession().execute(String.format("select KEY from NAMED_CAMEL_AGGREGATION where NAME='ID' and KEY='%s'", key))
+                .one()
+               != null;
+    }
+
+    @Test
+    public void testAdd() {
+        // Given
+        String key = "Add";
+        assertFalse(exists(key));
+        Exchange exchange = new DefaultExchange(context);
+        // When
+        aggregationRepository.add(context, key, exchange);
+        // Then
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testGetExists() {
+        // Given
+        String key = "Get_Exists";
+        Exchange exchange = new DefaultExchange(context);
+        aggregationRepository.add(context, key, exchange);
+        assertTrue(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(context, key);
+        // Then
+        assertNotNull(exchange2);
+        assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+    }
+
+    @Test
+    public void testGetNotExists() {
+        // Given
+        String key = "Get_NotExists";
+        assertFalse(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(context, key);
+        // Then
+        assertNull(exchange2);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        Exchange exchange = new DefaultExchange(context);
+        aggregationRepository.add(context, key, exchange);
+        assertTrue(exists(key));
+        // When
+        aggregationRepository.remove(context, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "RemoveNotExists";
+        Exchange exchange = new DefaultExchange(context);
+        assertFalse(exists(key));
+        // When
+        aggregationRepository.remove(context, key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testGetKeys() {
+        // Given
+        String[] keys = { "GetKeys1", "GetKeys2" };
+        addExchanges(keys);
+        // When
+        Set<String> keySet = aggregationRepository.getKeys();
+        // Then
+        for (String key : keys) {
+            assertTrue(keySet.contains(key));
+        }
+    }
+
+    @Test
+    public void testConfirmExist() {
+        // Given
+        for (int i = 1; i < 4; i++) {
+            String key = "Confirm_" + i;
+            Exchange exchange = new DefaultExchange(context);
+            exchange.setExchangeId("Exchange_" + i);
+            aggregationRepository.add(context, key, exchange);
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(context, "Exchange_2");
+        // Then
+        assertTrue(exists("Confirm_1"));
+        assertFalse(exists("Confirm_2"));
+        assertTrue(exists("Confirm_3"));
+    }
+
+    @Test
+    public void testConfirmNotExist() {
+        // Given
+        String[] keys = new String[3];
+        for (int i = 1; i < 4; i++) {
+            keys[i - 1] = "Confirm" + i;
+        }
+        addExchanges(keys);
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(context, "Exchange-Confirm5");
+        // Then
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+    }
+
+    private void addExchanges(String... keys) {
+        for (String key : keys) {
+            Exchange exchange = new DefaultExchange(context);
+            exchange.setExchangeId("Exchange-" + key);
+            aggregationRepository.add(context, key, exchange);
+        }
+    }
+
+    @Test
+    public void testScan() {
+        // Given
+        String[] keys = { "Scan1", "Scan2" };
+        addExchanges(keys);
+        // When
+        Set<String> exchangeIdSet = aggregationRepository.scan(context);
+        // Then
+        for (String key : keys) {
+            assertTrue(exchangeIdSet.contains("Exchange-" + key));
+        }
+    }
+
+    @Test
+    public void testRecover() {
+        // Given
+        String[] keys = { "Recover1", "Recover2" };
+        addExchanges(keys);
+        // When
+        Exchange exchange2 = aggregationRepository.recover(context, "Exchange-Recover2");
+        Exchange exchange3 = aggregationRepository.recover(context, "Exchange-Recover3");
+        // Then
+        assertNotNull(exchange2);
+        assertNull(exchange3);
+    }
+
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java
new file mode 100644
index 0000000..6010643
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/aggregate/util/HeaderDto.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.util;
+
+import java.io.Serializable;
+
+public class HeaderDto implements Cloneable, Serializable {
+
+    private static final long serialVersionUID = -5004840651888298047L;
+    private String org;
+    private String type;
+    private int key;
+
+    public HeaderDto(String org, String type, int key) {
+        this.org = org;
+        this.type = type;
+        this.key = key;
+    }
+
+    public int getKey() {
+        return key;
+    }
+
+    public void setKey(int key) {
+        this.key = key;
+    }
+
+    public String getOrg() {
+        return org;
+    }
+
+    public void setOrg(String org) {
+        this.org = org;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    @Override
+    public String toString() {
+        return "HeaderDto [org=" + org + ", type=" + type + ", key=" + key + "]";
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java
new file mode 100644
index 0000000..2f54a58
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentIT.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraIdempotentIT.class,
+        CassandraIdempotentIT.TestConfiguration.class
+    }
+)
+public class CassandraIdempotentIT extends BaseCassandra {
+
+    CassandraIdempotentRepository idempotentRepository;
+
+    
+    @EndpointInject("mock:output")
+    MockEndpoint mockOutput;
+
+
+   
+
+    private void send(String idempotentId, String body) {
+        super.template.sendBodyAndHeader("direct:input", body, "idempotentId", idempotentId);
+    }
+
+    @Test
+    public void testIdempotentRoute() throws Exception {
+        // Given
+        
+        mockOutput.expectedMessageCount(2);
+        mockOutput.expectedBodiesReceivedInAnyOrder("A", "B");
+        // When
+        send("1", "A");
+        send("2", "B");
+        send("1", "A");
+        send("2", "B");
+        send("1", "A");
+        // Then
+        mockOutput.assertIsSatisfied();
+
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    idempotentRepository = new NamedCassandraIdempotentRepository(getSession(), "ID");
+                    idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT");
+                    idempotentRepository.start();
+                    from("direct:input").idempotentConsumer(header("idempotentId"), idempotentRepository).to("mock:output");
+                }
+            };
+        }
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java
new file mode 100644
index 0000000..3313cc8
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepositoryIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        CassandraIdempotentRepositoryIT.class
+    }
+)
+public class CassandraIdempotentRepositoryIT extends BaseCassandra {
+
+    CassandraIdempotentRepository idempotentRepository;
+
+    
+       
+
+    
+    @BeforeEach
+    protected void doPreSetup() throws Exception {
+        idempotentRepository = new CassandraIdempotentRepository(getSession());
+        idempotentRepository.start();
+
+    }
+
+    @Override
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        executeScript("IdempotentDataSet.cql");
+    }
+
+    
+    @AfterEach
+    public void tearDown() throws Exception {
+        idempotentRepository.stop();
+    }
+
+    private boolean exists(String key) {
+        return getSession().execute(String.format("select KEY from CAMEL_IDEMPOTENT where KEY='%s'", key)).one() != null;
+    }
+
+    @Test
+    public void testAddNotExists() {
+        // Given
+        String key = "Add_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.add(key);
+        // Then
+        assertTrue(result);
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testAddExists() {
+        // Given
+        String key = "Add_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.add(key);
+        // Then
+        assertFalse(result);
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testContainsNotExists() {
+        // Given
+        String key = "Contains_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertFalse(result);
+    }
+
+    @Test
+    public void testContainsExists() {
+        // Given
+        String key = "Contains_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertTrue(result);
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "Remove_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertFalse(result);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.remove(key);
+        // Then
+        assertTrue(result);
+    }
+
+    @Test
+    public void testClear() {
+        // Given
+        String key = "Remove_Exists";
+        assertTrue(exists(key));
+        // When
+        idempotentRepository.clear();
+        // Then
+        assertFalse(idempotentRepository.contains(key));
+    }
+
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java
new file mode 100644
index 0000000..38a224b
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepositoryIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+
+import org.apache.camel.component.cassandra.springboot.BaseCassandra;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
+@CamelSpringBootTest
+@SpringBootTest(
+    classes = {
+        CamelAutoConfiguration.class,
+        NamedCassandraIdempotentRepositoryIT.class
+    }
+)
+public class NamedCassandraIdempotentRepositoryIT extends BaseCassandra {
+
+    CassandraIdempotentRepository idempotentRepository;
+
+    
+       
+
+    
+    @BeforeEach
+    protected void doPreSetup() throws Exception {
+        idempotentRepository = new NamedCassandraIdempotentRepository(getSession(), "ID");
+        idempotentRepository.setTable("NAMED_CAMEL_IDEMPOTENT");
+        idempotentRepository.start();
+
+    }
+
+    @Override
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        executeScript("NamedIdempotentDataSet.cql");
+    }
+
+    
+    @AfterEach
+    public void tearDown() throws Exception {
+        idempotentRepository.stop();
+    }
+
+    private boolean exists(String key) {
+        return getSession().execute(String.format("select KEY from NAMED_CAMEL_IDEMPOTENT where NAME='ID' and KEY='%s'", key))
+                .one()
+               != null;
+    }
+
+    @Test
+    public void testAddNotExists() {
+        // Given
+        String key = "Add_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.add(key);
+        // Then
+        assertTrue(result);
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testAddExists() {
+        // Given
+        String key = "Add_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.add(key);
+        // Then
+        assertFalse(result);
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testContainsNotExists() {
+        // Given
+        String key = "Contains_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertFalse(result);
+    }
+
+    @Test
+    public void testContainsExists() {
+        // Given
+        String key = "Contains_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertTrue(result);
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "Remove_NotExists";
+        assertFalse(exists(key));
+        // When
+        boolean result = idempotentRepository.contains(key);
+        // Then
+        assertFalse(result);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        assertTrue(exists(key));
+        // When
+        boolean result = idempotentRepository.remove(key);
+        // Then
+        assertTrue(result);
+    }
+
+    @Test
+    public void testClear() {
+        // Given
+        String key = "Remove_Exists";
+        assertTrue(exists(key));
+        // When
+        idempotentRepository.clear();
+        // Then
+        assertFalse(idempotentRepository.contains(key));
+    }
+}
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql
new file mode 100644
index 0000000..9054e40
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/BasicDataSet.cql
@@ -0,0 +1,6 @@
+TRUNCATE camel_user;
+
+INSERT INTO camel_user(login, first_name, last_name)
+    VALUES('j_strachan','James','Strachan');
+INSERT INTO camel_user(login, first_name, last_name)
+    VALUES('c_ibsen','Claus','Ibsen');
\ No newline at end of file
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql
new file mode 100644
index 0000000..2d4b974
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/IdempotentDataSet.cql
@@ -0,0 +1,8 @@
+TRUNCATE CAMEL_IDEMPOTENT;
+
+INSERT INTO CAMEL_IDEMPOTENT(KEY)
+    VALUES('Add_Exists');
+INSERT INTO camel_ks.CAMEL_IDEMPOTENT(KEY)
+    VALUES('Contains_Exists');
+INSERT INTO camel_ks.CAMEL_IDEMPOTENT(KEY)
+    VALUES('Remove_Exists');
\ No newline at end of file
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql b/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql
new file mode 100644
index 0000000..a8adebd
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/NamedIdempotentDataSet.cql
@@ -0,0 +1,8 @@
+TRUNCATE NAMED_CAMEL_IDEMPOTENT;
+
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+    VALUES('ID','Add_Exists');
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+    VALUES('ID','Contains_Exists');
+INSERT INTO NAMED_CAMEL_IDEMPOTENT(NAME, KEY)
+    VALUES('ID','Remove_Exists');
\ No newline at end of file
diff --git a/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql b/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql
new file mode 100644
index 0000000..0253f9b
--- /dev/null
+++ b/components-starter/camel-cassandraql-starter/src/test/resources/initScript.cql
@@ -0,0 +1,33 @@
+CREATE KEYSPACE IF NOT EXISTS camel_ks WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
+
+CREATE TABLE camel_ks.camel_user (
+  login varchar PRIMARY KEY,
+  first_name varchar,
+  last_name varchar
+);
+
+CREATE TABLE camel_ks.NAMED_CAMEL_AGGREGATION (
+  NAME varchar,
+  KEY varchar,
+  EXCHANGE_ID varchar,
+  EXCHANGE blob,
+  PRIMARY KEY (NAME, KEY)
+);
+
+CREATE TABLE camel_ks.NAMED_CAMEL_IDEMPOTENT (
+  NAME varchar,
+  KEY varchar,
+  PRIMARY KEY (NAME, KEY)
+);
+
+CREATE TABLE camel_ks.CAMEL_IDEMPOTENT (
+  KEY varchar,
+  PRIMARY KEY (KEY)
+);
+
+CREATE TABLE camel_ks.CAMEL_AGGREGATION (
+  KEY varchar,
+  EXCHANGE_ID varchar,
+  EXCHANGE blob,
+  PRIMARY KEY (KEY)
+);