You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/23 12:25:02 UTC

[2/5] camel git commit: CAMEL-2939: Camel Cassandra 3 CQL3 first edition

CAMEL-2939: Camel Cassandra 3 CQL3 first edition

Handle Cassandra authentication

Improve ResultSet to message body conversion

Added ReadMe.md

Update ReadMe.md

Rename package

Upgrade dependencies

Implement IdempotentRepository using Cassandra

Implement AggregationRepository using Cassandra

Renamed CassandraQl into Cassandra

Refactored CQL generation

Removed PKHelper in favour of generateInsert, generateDelete... methods


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7be546af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7be546af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7be546af

Branch: refs/heads/master
Commit: 7be546af5c889b0fd7afbfdc2ef52c4a60d9879c
Parents: dade396
Author: Gerald Quintana <ge...@gmail.com>
Authored: Sat Sep 20 21:08:01 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Dec 23 11:21:05 2014 +0100

----------------------------------------------------------------------
 components/camel-cassandraql/ReadMe.md          |  43 ++
 components/camel-cassandraql/pom.xml            |  90 +++
 .../component/cassandra/CassandraComponent.java | 131 ++++
 .../component/cassandra/CassandraConstants.java |  28 +
 .../component/cassandra/CassandraConsumer.java  |  55 ++
 .../component/cassandra/CassandraEndpoint.java  | 138 ++++
 .../component/cassandra/CassandraProducer.java  |  83 ++
 .../ResultSetConversionStrategies.java          | 103 +++
 .../cassandra/ResultSetConversionStrategy.java  |  26 +
 .../CassandraAggregationException.java          |  31 +
 .../CassandraAggregationRepository.java         | 345 +++++++++
 .../cassandra/CassandraCamelCodec.java          | 111 +++
 .../NamedCassandraAggregationRepository.java    |  58 ++
 .../CassandraIdempotentRepository.java          | 212 ++++++
 .../NamedCassandraIdempotentRepository.java     |  59 ++
 .../utils/cassandra/CassandraSessionHolder.java |  82 ++
 .../camel/utils/cassandra/CassandraUtils.java   | 163 ++++
 .../services/org/apache/camel/component/cql     |   1 +
 .../CassandraComponentBeanRefTest.java          |  77 ++
 .../CassandraComponentClusterBuilderTest.java   |  74 ++
 .../CassandraComponentConsumerTest.java         |  73 ++
 .../CassandraComponentProducerTest.java         |  87 +++
 .../component/cassandra/CassandraUnitUtils.java |  77 ++
 .../ResultSetConversionStrategiesTest.java      |  73 ++
 .../CassandraAggregationRepositoryTest.java     | 182 +++++
 .../cassandra/CassandraAggregationTest.java     | 104 +++
 .../CassandraIdempotentRepositoryTest.java      | 129 ++++
 .../cassandra/CassandraIdempotentTest.java      |  82 ++
 .../src/test/resources/AggregationDataSet.cql   |   7 +
 .../src/test/resources/BasicDataSet.cql         |   9 +
 .../src/test/resources/IdempotentDataSet.cql    |  11 +
 .../src/test/resources/camel-cassandra.yaml     | 762 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  17 +
 components/pom.xml                              |   1 +
 34 files changed, 3524 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/ReadMe.md
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/ReadMe.md b/components/camel-cassandraql/ReadMe.md
new file mode 100644
index 0000000..f149384
--- /dev/null
+++ b/components/camel-cassandraql/ReadMe.md
@@ -0,0 +1,43 @@
+# Camel Cassandra Component
+
+This component aims at integrating Cassandra 2.0+ using the CQL3 API (not the Thrift API).
+It's based on [Cassandra Java Driver](https://github.com/datastax/java-driver) provided by DataStax.
+
+## URI
+
+### Examples
+
+| URI                              | Description
+|----------------------------------|----------------------------------
+|`cql:localhost/keyspace`          | single host, default port, usual for testing
+|`cql:host1,host2/keyspace`        | multi host, default port
+|`cql:host1:host2:9042/keyspace`   |
+|`cql:host1:host2`                 | default port and keyspace
+|`cql:bean:sessionRef`             | provided Session reference
+|`cql:bean:clusterRef/keyspace`    | provided Cluster reference
+
+### Options
+
+| Option                           | Description
+|----------------------------------|----------------------------------
+|`clusterName`                     | cluster name
+|`username and password`           | session authentication
+|`cql`                             | CQL query
+|`consistencyLevel`                | `ANY`, `ONE`, `TWO`, `QUORUM`, `LOCAL_QUORUM`...
+|`resultSetConversionStrategy`     | how is ResultSet converted transformed into message body `ALL`, `ONE`, `LIMIT_10`, `LIMIT_100`...
+
+## Message
+
+### Incoming
+
+Headers:
+* `CamelCqlQuery` (optional, String): CQL query
+
+Body
+* (`Object[]` or `Collection<Object>`): CQL query parameters to be bound
+
+### Outgoing
+
+Body
+* `List<Row>` if resultSetConversionStrategy is ALL or LIMIT_10
+* `Row` if resultSetConversionStrategy is ONE

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/pom.xml b/components/camel-cassandraql/pom.xml
new file mode 100644
index 0000000..88d6e84
--- /dev/null
+++ b/components/camel-cassandraql/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.15-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>camel-cassandraql</artifactId>
+  <packaging>bundle</packaging>
+
+  <name>Camel :: Cassandra CQL</name>
+  <description>Cassandra CQL3 support</description>
+  
+  <properties>
+    <camel.osgi.export.pkg>
+      org.apache.camel.component.cassandraql.*
+    </camel.osgi.export.pkg>
+    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=cql</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+
+    <!-- logging -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+    <!-- cassandra -->
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-core</artifactId>
+      <version>2.1.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.cassandraunit</groupId>
+      <artifactId>cassandra-unit</artifactId>
+      <version>2.0.2.2</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>org.apache.cassandra</artifactId>
+          <groupId>cassandra-all</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.cassandra</groupId>
+      <artifactId>cassandra-all</artifactId>
+      <version>2.1.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.yaml</groupId>
+      <artifactId>snakeyaml</artifactId>
+      <version>1.14</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java
new file mode 100644
index 0000000..761a5d7
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java
@@ -0,0 +1,131 @@
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.EndpointHelper;
+
+/**
+ * Represents the component that manages {@link CassandraEndpoint}. This
+ * component is based on Datastax Java Driver for Cassandra.
+ *
+ * URI examples:
+ * <ul>
+ * <li>cql:localhost/keyspace</li>
+ * <li>cql:host1,host2/keyspace</li>
+ * <li>cql:host1:host2:9042/keyspace</li>
+ * <li>cql:host1:host2</li>
+ * <li>cql:bean:sessionRef</li>
+ * <li>cql:bean:clusterRef/keyspace</li>
+ * </ul>
+ */
+public class CassandraComponent extends DefaultComponent {
+    /**
+     * Regular expression for parsing host name
+     */
+    private static final String HOST_PATTERN = "[\\w.\\-]+";
+    /**
+     * Regular expression for parsing several hosts name
+     */
+    private static final String HOSTS_PATTERN = HOST_PATTERN + "(?:," + HOST_PATTERN + ")*";
+    /**
+     * Regular expression for parsing port
+     */
+    private static final String PORT_PATTERN = "\\d+";
+    /**
+     * Regular expression for parsing keyspace
+     */
+    private static final String KEYSPACE_PATTERN = "\\w+";
+    /**
+     * Regular expression for parsing URI host1,host2:9042/keyspace
+     */
+    private static final Pattern HOSTS_PORT_KEYSPACE_PATTERN = Pattern.compile(
+            "^(" + HOSTS_PATTERN + ")?" // Hosts
+            + "(?::(" + PORT_PATTERN + "))?" // Port
+            + "(?:/(" + KEYSPACE_PATTERN + "))?$"); // Keyspace
+    /**
+     * Regular expression for parsing URI bean:sessionRef
+     */
+    private static final Pattern BEAN_REF_PATTERN = Pattern.compile(
+            "^bean:([\\w.\\-]+)(?:/(" + KEYSPACE_PATTERN + "))?$"); // Keyspace
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        Cluster cluster;
+        Session session;
+        String keyspace;
+        // Try URI of type cql:bean:session or 
+        Matcher beanRefMatcher = BEAN_REF_PATTERN.matcher(remaining);
+        if (beanRefMatcher.matches()) {
+            String beanRefName = beanRefMatcher.group(1);
+            keyspace = beanRefMatcher.group(2);
+            Object bean = EndpointHelper.resolveParameter(getCamelContext(), "#"+beanRefName, Object.class);
+            if (bean instanceof Session) {
+                session = (Session) bean;
+                cluster = session.getCluster();
+                keyspace = session.getLoggedKeyspace();
+            } else if (bean instanceof Cluster) {
+                cluster = (Cluster) bean;
+                session = null;
+            } else {
+                throw new IllegalArgumentException("CQL Bean type should be of type Session or Cluster but was " + bean);
+            }
+        } else {
+            // Try URI of type cql:host1,host2:9042/keyspace
+            cluster = clusterBuilder(remaining, parameters).build();
+            session = null;
+            keyspace = getAndRemoveParameter(parameters, "keyspace", String.class);
+        }
+
+        Endpoint endpoint = new CassandraEndpoint(uri, this, cluster, session, keyspace);
+        setProperties(endpoint, parameters);
+        return endpoint;
+    }
+    /**
+     * Parse URI of the form cql://host1,host2:9042/keyspace and create a
+     * {@link Cluster.Builder}
+     */
+    protected Cluster.Builder clusterBuilder(String remaining, Map<String, Object> parameters) throws NumberFormatException {
+        Cluster.Builder clusterBuilder = Cluster.builder();
+        Matcher matcher = HOSTS_PORT_KEYSPACE_PATTERN.matcher(remaining);
+        if (matcher.matches()) {
+            // Parse hosts
+            String hostsGroup = matcher.group(1);
+            if (hostsGroup != null && !hostsGroup.isEmpty()) {
+                String[] hosts = hostsGroup.split(",");
+                clusterBuilder = clusterBuilder.addContactPoints(hosts);
+            }
+            // Parse port
+            String portGroup = matcher.group(2);
+            if (portGroup != null) {
+                Integer port = Integer.valueOf(portGroup);
+                clusterBuilder = clusterBuilder.withPort(port);
+            }
+            // Parse keyspace
+            String keyspaceGroup = matcher.group(3);
+            if (keyspaceGroup != null && !keyspaceGroup.isEmpty()) {
+                String keyspace = keyspaceGroup;
+                parameters.put("keyspace", keyspace);
+            }
+        } else {
+            throw new IllegalArgumentException("Invalid CQL URI");
+        }
+        // Cluster name parameter
+        String clusterName = getAndRemoveParameter(parameters, "clusterName", String.class);
+        if (clusterName != null) {
+            clusterBuilder = clusterBuilder.withClusterName(clusterName);
+        }
+        // Username and password
+        String username = getAndRemoveOrResolveReferenceParameter(parameters, "username", String.class);
+        String password = getAndRemoveOrResolveReferenceParameter(parameters, "password", String.class);
+        if (username != null && !username.isEmpty() && password!=null) {
+            clusterBuilder.withCredentials(username, password);
+        }
+        return clusterBuilder;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
new file mode 100644
index 0000000..1c7722f
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra;
+
+/**
+ * Cassandra QL Endpoint constants
+ */
+public class CassandraConstants {
+    /**
+     * In Message header: CQL Query
+     */
+    public static final String CQL_QUERY = "CamelCqlQuery";
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
new file mode 100644
index 0000000..20b0812
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -0,0 +1,55 @@
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+
+/**
+ * Cassandra 2 CQL3 consumer.
+ */
+public class CassandraConsumer extends ScheduledPollConsumer {
+
+    /**
+     * Prepared statement used for polling
+     */
+    private PreparedStatement preparedStatement;
+
+    public CassandraConsumer(CassandraEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    public CassandraEndpoint getEndpoint() {
+        return (CassandraEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // Execute CQL Query
+        Session session = getEndpoint().getSession();
+        if (preparedStatement == null) {
+            preparedStatement = getEndpoint().prepareStatement();
+        }
+        ResultSet resultSet = session.execute(preparedStatement.bind());
+        
+        // Create message from ResultSet
+        Exchange exchange = getEndpoint().createExchange();
+        Message message = exchange.getIn();
+        getEndpoint().fillMessage(resultSet, message);
+
+        try {
+            // send message to next processor in the route
+            getProcessor().process(exchange);
+            return 1; // number of messages polled
+        } finally {
+            // log exception if an exception occurred and was not handled
+            if (exchange.getException() != null) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
new file mode 100644
index 0000000..c7f7b11
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
@@ -0,0 +1,138 @@
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import org.apache.camel.Consumer;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.utils.cassandra.CassandraSessionHolder;
+
+/**
+ * Cassandra 2 CQL3 endpoint
+ */
+public class CassandraEndpoint extends DefaultEndpoint {
+    /**
+     * Session holder
+     */
+    private CassandraSessionHolder sessionHolder;
+    /**
+     * CQL query
+     */
+    private String cql;
+    /**
+     * Consistency level: ONE, TWO, QUORUM, LOCAL_QUORUM, ALL...
+     */
+    @UriParam
+    private ConsistencyLevel consistencyLevel;
+    /**
+     * How many rows should be retrieved in message body
+     */
+    private ResultSetConversionStrategy resultSetConversionStrategy = ResultSetConversionStrategies.all();
+    /**
+     * Cassandra URI
+     *
+     * @param uri
+     * @param component Parent component
+     * @param cluster Cluster (required)
+     * @param session Session (optional)
+     * @param keyspace Keyspace (optional)
+     */
+    public CassandraEndpoint(String uri, CassandraComponent component, Cluster cluster, Session session, String keyspace) {
+        super(uri, component);
+        if (session == null) {
+            sessionHolder = new CassandraSessionHolder(cluster, keyspace);
+        } else {
+            sessionHolder = new CassandraSessionHolder(session);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        sessionHolder.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        sessionHolder.stop();
+        super.doStop();
+    }
+
+    public Producer createProducer() throws Exception {
+        return new CassandraProducer(this);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new CassandraConsumer(this, processor);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public Session getSession() {
+        return sessionHolder.getSession();
+    }
+
+    public String getCql() {
+        return cql;
+    }
+
+    public void setCql(String cql) {
+        this.cql = cql;
+    }
+
+    public String getKeyspace() {
+        return sessionHolder.getKeyspace();
+    }
+
+    public ConsistencyLevel getConsistencyLevel() {
+        return consistencyLevel;
+    }
+
+    public void setConsistencyLevel(ConsistencyLevel consistencyLevel) {
+        this.consistencyLevel = consistencyLevel;
+    }
+
+    public ResultSetConversionStrategy getResultSetConversionStrategy() {
+        return resultSetConversionStrategy;
+    }
+
+    public void setResultSetConversionStrategy(ResultSetConversionStrategy resultSetConversionStrategy) {
+        this.resultSetConversionStrategy = resultSetConversionStrategy;
+    }
+
+    public void setResultSetConversionStrategy(String converter) {
+        this.resultSetConversionStrategy = ResultSetConversionStrategies.fromName(converter);
+    }
+
+    /**
+     * Create and configure a Prepared CQL statement
+     */
+    protected PreparedStatement prepareStatement(String cql) {
+        PreparedStatement preparedStatement = getSession().prepare(cql);
+        if (consistencyLevel != null) {
+            preparedStatement.setConsistencyLevel(consistencyLevel);
+        }
+        return preparedStatement;
+    }
+    /**
+     * Create and configure a Prepared CQL statement
+     */
+    protected PreparedStatement prepareStatement() {
+        return prepareStatement(cql);
+    }
+    /**
+     * Copy ResultSet into Message.
+     */
+    protected void fillMessage(ResultSet resultSet, Message message) {
+        message.setBody(resultSetConversionStrategy.getBody(resultSet));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
new file mode 100644
index 0000000..ac429e0
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
@@ -0,0 +1,83 @@
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import java.util.Collection;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cassandra 2 CQL3 producer.
+ * <dl>
+ *  <dt>In Message</dt>
+ *  <dd>Bound parameters: Collection of Objects, Array of Objects, Simple Object<dd>
+ *  <dt>Out Message</dt>
+ *  <dd>List of all Rows<dd>
+ * <dl>
+ */
+public class CassandraProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraProducer.class);
+    private PreparedStatement preparedStatement;
+    public CassandraProducer(CassandraEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public CassandraEndpoint getEndpoint() {
+        return (CassandraEndpoint) super.getEndpoint();
+    }
+
+    private Object[] getCqlParams(Message message) {
+        Object cqlParamsObj = message.getBody(Object.class);
+        Object[] cqlParams;
+        final Class<Object[]> objectArrayClazz = Object[].class;
+        if (objectArrayClazz.isInstance(cqlParamsObj)) {
+            cqlParams = objectArrayClazz.cast(cqlParamsObj);
+        } else if (cqlParamsObj instanceof Collection) {
+            final Collection cqlParamsColl = (Collection) cqlParamsObj;
+            cqlParams = cqlParamsColl.toArray();
+        } else {
+            cqlParams = new Object[]{cqlParamsObj};
+        }
+        return cqlParams;
+    }
+    /**
+     * Execute CQL query using incoming message body has statement parameters.
+     */
+    private ResultSet execute(Message message) {
+        String messageCql = message.getHeader(CassandraConstants.CQL_QUERY, String.class);
+        Object[] cqlParams = getCqlParams(message);
+        
+        ResultSet resultSet;  
+        PreparedStatement lPreparedStatement;
+        if (messageCql == null || messageCql.isEmpty()) {
+            // URI CQL
+            if (preparedStatement == null) {
+                this.preparedStatement = getEndpoint().prepareStatement();
+            }
+            lPreparedStatement = this.preparedStatement;
+        } else {
+            // Message CQL
+            lPreparedStatement = getEndpoint().prepareStatement(messageCql);
+        }
+        Session session = getEndpoint().getSession();
+        if (cqlParams == null) {
+            resultSet = session.execute(lPreparedStatement.bind());
+        } else {
+            resultSet = session.execute(lPreparedStatement.bind(cqlParams));
+        }            
+        return resultSet;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        ResultSet resultSet = execute(exchange.getIn());
+        getEndpoint().fillMessage(resultSet, exchange.getOut());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategies.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategies.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategies.java
new file mode 100644
index 0000000..51ed9c4
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategies.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implementations of {@link ResultSetConversionStrategy}
+ */
+public class ResultSetConversionStrategies {
+    private static final ResultSetConversionStrategy ALL = new ResultSetConversionStrategy() {
+        @Override
+        public Object getBody(ResultSet resultSet) {
+            return resultSet.all();
+        }
+    };
+    /**
+     * Retrieve all rows.
+     * Message body contains a big list of {@link Row}s
+     */
+    public static ResultSetConversionStrategy all() {
+        return ALL;
+    }
+    private static final ResultSetConversionStrategy ONE = new ResultSetConversionStrategy() {
+        @Override
+        public Object getBody(ResultSet resultSet) {
+            return resultSet.one();
+        }
+    };
+    /**
+     * Retrieve a single row.
+     * Message body contains a single {@link Row}
+     */
+    public static ResultSetConversionStrategy one() {
+        return ONE;
+    }
+    private static class LimitResultSetConversionStrategy implements ResultSetConversionStrategy {
+        private final int rowMax;
+        public LimitResultSetConversionStrategy(int rowMax) {
+            this.rowMax = rowMax;
+        }
+
+        @Override
+        public Object getBody(ResultSet resultSet) {
+            List<Row> rows = new ArrayList<Row>(rowMax);
+            int rowCount = 0;
+            Iterator<Row> rowIter = resultSet.iterator();
+            while(rowIter.hasNext() && rowCount<rowMax) {
+                rows.add(rowIter.next());
+                rowCount++;
+            }
+            return rows;
+        }        
+    }
+    /**
+     * Retrieve a limited list of rows.
+     * Message body contains a list of {@link Row} containing at most rowMax rows.
+     */
+    public static ResultSetConversionStrategy limit(int rowMax) {
+        return new LimitResultSetConversionStrategy(rowMax);
+    }
+    private static final Pattern LIMIT_NAME_PATTERN=Pattern.compile("^LIMIT_(\\d+)$");
+    /**
+     * Get {@link ResultSetConversionStrategy} from String
+     */
+    public static ResultSetConversionStrategy fromName(String name) {
+        if (name == null) {
+            return null;
+        }
+        if (name.equals("ALL")) {
+            return ResultSetConversionStrategies.all();
+        }
+        if (name.equals("ONE")) {
+            return ResultSetConversionStrategies.one();
+        }
+        Matcher matcher = LIMIT_NAME_PATTERN.matcher(name);
+        if (matcher.matches()) {
+            int limit = Integer.parseInt(matcher.group(1));
+            return limit(limit);
+        }
+        throw new IllegalArgumentException("Unknown conversion strategy "+name);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategy.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategy.java
new file mode 100644
index 0000000..5874d5a
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/ResultSetConversionStrategy.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.ResultSet;
+
+/**
+ * Strategy to convert {@link ResultSet} into message body
+ */
+public interface ResultSetConversionStrategy {
+    Object getBody(ResultSet resultSet);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationException.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationException.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationException.java
new file mode 100644
index 0000000..cfc0049
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationException.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.aggregate.cassandra;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+
+/**
+ * Error occurred in Cassandra Aggregation repository
+ */
+public class CassandraAggregationException extends CamelExecutionException{
+
+    public CassandraAggregationException(String message, Exchange exchange, Throwable cause) {
+        super(message, exchange, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
new file mode 100644
index 0000000..db900ef
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraAggregationRepository.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.aggregate.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.AggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.utils.cassandra.CassandraSessionHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.camel.utils.cassandra.CassandraUtils.*;
+
+/**
+ * Implementation of {@link AggregationRepository} using Cassandra table to store
+ * exchanges.
+ * Advice: use LeveledCompaction for this table and tune read/write consistency levels.
+ * Warning: Cassandra is not the best tool for queuing use cases
+ * @see http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets
+ */
+public abstract class CassandraAggregationRepository extends ServiceSupport implements AggregationRepository {
+    /**
+     * Logger
+     */
+    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAggregationRepository.class);
+    /**
+     * Session holder
+     */
+    private CassandraSessionHolder sessionHolder;
+    /**
+     * Table name
+     */
+    private String table = "CAMEL_AGGREGATION";
+    /**
+     * Exchange Id column name
+     */
+    private String exchangeIdColumn="EXCHANGE_ID";
+    /**
+     * Exchange column name
+     */
+    private String exchangeColumn="EXCHANGE";
+    /**
+     * Primary key columns
+     */
+    private String[] pkColumns;
+    /**
+     * Exchange marshaller/unmarshaller
+     */
+    private final CassandraCamelCodec exchangeCodec = new CassandraCamelCodec();
+    /**
+     * Time to live in seconds used for inserts
+     */
+    private Integer ttl;
+    /**
+     * Writeconsistency level
+     */
+    private ConsistencyLevel writeConsistencyLevel;
+    /**
+     * Read consistency level
+     */
+    private ConsistencyLevel readConsistencyLevel;
+    
+    private PreparedStatement insertStatement;
+    private PreparedStatement selectStatement;
+    private PreparedStatement deleteStatement;
+    /**
+     * Prepared statement used to get keys and exchange ids
+     */
+    private PreparedStatement selectKeyIdStatement;
+    /**
+     * Prepared statement used to delete with key and exchange id
+     */
+    private PreparedStatement deleteIfIdStatement;
+
+    public CassandraAggregationRepository() {
+    }
+
+    public CassandraAggregationRepository(Session session) {
+        this.sessionHolder = new CassandraSessionHolder(session);
+    }
+    public CassandraAggregationRepository(Cluster cluster, String keyspace) {
+        this.sessionHolder = new CassandraSessionHolder(cluster, keyspace);
+    }
+    /**
+     * Get fixed primary key values.
+     */
+    protected abstract Object[] getPKValues();
+    /**
+     * Generate primary key values: fixed + aggregation key.
+     */
+    protected Object[] getPKValues(String key) {
+        return append(getPKValues(), key);
+    }
+    /**
+     * Get aggregation key colum name.
+     */
+    private String getKeyColumn() {
+        return pkColumns[pkColumns.length-1];
+    }
+    private String[] getAllColumns() {
+        return append(pkColumns, exchangeIdColumn, exchangeColumn);
+    }
+    //--------------------------------------------------------------------------
+    // Service support
+    
+    @Override
+    protected void doStart() throws Exception {
+        sessionHolder.start();
+        initInsertStatement();
+        initSelectStatement();
+        initDeleteStatement();
+        initSelectKeyIdStatement();
+        initDeleteIfIdStatement();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        sessionHolder.stop();
+    }
+
+    // -------------------------------------------------------------------------
+    // Add exchange to repository
+
+    private void initInsertStatement() {
+        String cql = generateInsert(table, 
+                getAllColumns(), 
+                false, ttl).toString();
+        LOGGER.debug("Generated Insert {}", cql);
+        insertStatement = applyConsistencyLevel(getSession().prepare(cql), writeConsistencyLevel);
+    }
+    /**
+     * Insert or update exchange in aggregation table.
+     */
+    @Override
+    public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
+        final Object[] idValues = getPKValues(key);
+        LOGGER.debug("Inserting key {} exchange {}", idValues, exchange);
+        try {
+            ByteBuffer marshalledExchange = exchangeCodec.marshallExchange(camelContext, exchange);
+            Object[] cqlParams = concat(idValues, new Object[]{exchange.getExchangeId(), marshalledExchange});
+            getSession().execute(insertStatement.bind(cqlParams));
+            return exchange;
+        } catch (IOException iOException) {
+            throw new CassandraAggregationException("Failed to write exchange", exchange, iOException);
+        }
+    }
+
+    // -------------------------------------------------------------------------
+    // Get exchange from repository
+
+    protected void initSelectStatement() {
+        String cql = generateSelect(table, 
+                getAllColumns(), 
+                pkColumns).toString();
+        LOGGER.debug("Generated Select {}", cql);
+        selectStatement = applyConsistencyLevel(getSession().prepare(cql), readConsistencyLevel);
+    }
+    /**
+     * Get exchange from aggregation table by aggregation key.
+     */
+    @Override
+    public Exchange get(CamelContext camelContext, String key) {
+        Object[] pkValues = getPKValues(key);
+        LOGGER.debug("Selecting key {} ", pkValues);
+        Row row = getSession().execute(selectStatement.bind(pkValues)).one();
+        Exchange exchange = null;
+        if (row!=null) {
+            try {
+                exchange = exchangeCodec.unmarshallExchange(camelContext, row.getBytes(exchangeColumn));
+            } catch (IOException iOException) {
+                throw new CassandraAggregationException("Failed to read exchange", exchange, iOException);
+            } catch (ClassNotFoundException classNotFoundException) {
+                throw new CassandraAggregationException("Failed to read exchange", exchange, classNotFoundException);
+            }
+        }
+        return exchange;
+    }
+
+    // -------------------------------------------------------------------------
+    // Confirm exchange in repository
+    private void initDeleteIfIdStatement() {
+        StringBuilder cqlBuilder = generateDelete(table, pkColumns, false);
+        cqlBuilder.append(" if ").append(exchangeIdColumn).append("=?");
+        String cql = cqlBuilder.toString();
+        LOGGER.debug("Generated Delete If Id {}", cql);
+        deleteIfIdStatement = applyConsistencyLevel(getSession().prepare(cql), writeConsistencyLevel);
+    }
+
+    /**
+     * Remove exchange by Id from aggregation table.
+     */
+    @Override
+    public void confirm(CamelContext camelContext, String exchangeId) {
+        Object[] pkValues = getPKValues();
+        String keyColumn= getKeyColumn();
+        LOGGER.debug("Selecting Ids {} ", pkValues);
+        List<Row> rows = selectKeyIds();
+        for(Row row:rows) {
+            if (row.getString(exchangeIdColumn).equals(exchangeId)) {
+                String key = row.getString(keyColumn);
+                Object[] cqlParams = append(pkValues, key, exchangeId);
+                LOGGER.debug("Deleting If Id {} ", cqlParams);
+                getSession().execute(deleteIfIdStatement.bind(cqlParams));
+            }
+        }
+    }
+
+    // -------------------------------------------------------------------------
+    // Remove exchange from repository
+
+    private void initDeleteStatement() {
+        String cql = generateDelete(table, pkColumns, false).toString();
+        LOGGER.debug("Generated Delete {}", cql);
+        deleteStatement = applyConsistencyLevel(getSession().prepare(cql), writeConsistencyLevel);
+    }
+
+    /**
+     * Remove exchange by aggregation key from aggregation table.
+     */
+    @Override
+    public void remove(CamelContext camelContext, String key, Exchange exchange) {
+        Object[] idValues = getPKValues(key);
+        LOGGER.debug("Deleting key {}", (Object) idValues);
+        getSession().execute(deleteStatement.bind(idValues));
+    }
+    // -------------------------------------------------------------------------
+    private void initSelectKeyIdStatement() { 
+        String cql = generateSelect(table, 
+                new String[]{getKeyColumn(), exchangeIdColumn}, // Key + Exchange Id columns
+                pkColumns, pkColumns.length-1).toString(); // Where fixed PK columns
+        LOGGER.debug("Generated Select keys {}", cql);
+        selectKeyIdStatement = applyConsistencyLevel(getSession().prepare(cql), readConsistencyLevel);
+    }
+
+    private List<Row> selectKeyIds() {
+        Object[] pkValues = getPKValues();
+        LOGGER.debug("Selecting keys {}", pkValues);
+        return getSession().execute(selectKeyIdStatement.bind(pkValues)).all();
+    }
+
+    /**
+     * Get aggregation keys from aggregation table.
+     */
+    @Override
+    public Set<String> getKeys() {
+        List<Row> rows = selectKeyIds();
+        Set<String> keys = new HashSet<String>(rows.size());
+        String keyColumnName = getPKColumns()[1];
+        for(Row row:rows) {
+            keys.add(row.getString(keyColumnName));
+        }
+        return keys;
+    }
+
+
+    // -------------------------------------------------------------------------
+    // Getters and Setters
+
+    public Session getSession() {
+        return sessionHolder.getSession();
+    }
+
+    public void setSession(Session session) {
+        this.sessionHolder = new CassandraSessionHolder(session);
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+    public String[] getPKColumns() {
+        return pkColumns;
+    }
+    public void setPKColumns(String ... pkColumns) {
+        this.pkColumns = pkColumns;
+    }
+
+    public String getExchangeIdColumn() {
+        return exchangeIdColumn;
+    }
+
+    public void setExchangeIdColumn(String exchangeIdColumn) {
+        this.exchangeIdColumn = exchangeIdColumn;
+    }
+
+    public ConsistencyLevel getWriteConsistencyLevel() {
+        return writeConsistencyLevel;
+    }
+
+    public void setWriteConsistencyLevel(ConsistencyLevel writeConsistencyLevel) {
+        this.writeConsistencyLevel = writeConsistencyLevel;
+    }
+
+    public ConsistencyLevel getReadConsistencyLevel() {
+        return readConsistencyLevel;
+    }
+
+    public void setReadConsistencyLevel(ConsistencyLevel readConsistencyLevel) {
+        this.readConsistencyLevel = readConsistencyLevel;
+    }
+    
+    public String getExchangeColumn() {
+        return exchangeColumn;
+    }
+
+    public void setExchangeColumn(String exchangeColumnName) {
+        this.exchangeColumn = exchangeColumnName;
+    }
+
+    public Integer getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(Integer ttl) {
+        this.ttl = ttl;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
new file mode 100644
index 0000000..e060678
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.camel.processor.aggregate.cassandra;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+
+/**
+ * Marshall/unmarshall Exchange to/from a ByteBuffer.
+ * Inspired from JdbcCamelCodec. TODO Find how to share code
+ */
+public class CassandraCamelCodec {
+
+    public ByteBuffer marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
+        // use DefaultExchangeHolder to marshal to a serialized object
+        DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
+        // add the aggregated size and timeout property as the only properties we want to retain
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_TIMEOUT, exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, Long.class));
+        // add the aggregated completed by property to retain
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
+        // add the aggregated correlation key property to retain
+        DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
+        // persist the from endpoint as well
+        if (exchange.getFromEndpoint() != null) {
+            DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());
+        }
+        return ByteBuffer.wrap(serialize(pe));
+    }
+
+    public Exchange unmarshallExchange(CamelContext camelContext, ByteBuffer buffer) throws IOException, ClassNotFoundException {
+        DefaultExchangeHolder pe = (DefaultExchangeHolder) deserialize(new ByteBufferInputStream(buffer));
+        Exchange answer = new DefaultExchange(camelContext);
+        DefaultExchangeHolder.unmarshal(answer, pe);
+        // restore the from endpoint
+        String fromEndpointUri = (String) answer.removeProperty("CamelAggregatedFromEndpoint");
+        if (fromEndpointUri != null) {
+            Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
+            if (fromEndpoint != null) {
+                answer.setFromEndpoint(fromEndpoint);
+            }
+        }
+        return answer;
+    }
+
+    private byte[] serialize(Object object) throws IOException {
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        ObjectOutputStream objectOut = new ObjectOutputStream(bytesOut);
+        objectOut.writeObject(object);
+        objectOut.close();
+        return bytesOut.toByteArray();
+    }
+
+    private Object deserialize(InputStream bytes) throws IOException, ClassNotFoundException {
+        ObjectInputStream objectIn = new ObjectInputStream(bytes);
+        Object object = objectIn.readObject();
+        objectIn.close();
+        return object;
+    }
+
+    private static class ByteBufferInputStream extends InputStream {
+
+        private final ByteBuffer buffer;
+
+        public ByteBufferInputStream(ByteBuffer buffer) {
+            this.buffer = buffer;
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (!buffer.hasRemaining()) {
+                return -1;
+            }
+            return buffer.get();
+        }
+
+        @Override
+        public int read(byte[] bytes, int off, int len) throws IOException {
+            if (!buffer.hasRemaining()) {
+                return -1;
+            }
+            len = Math.min(len, buffer.remaining());
+            buffer.get(bytes, off, len);
+            return len;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
new file mode 100644
index 0000000..5448207
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/NamedCassandraAggregationRepository.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.aggregate.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+/**
+ * Concrete implementation of {@link  CassandraAggregationRepository} using 2 
+ * columns as primary key: name (partition key) and key (clustering key).
+ */
+public class NamedCassandraAggregationRepository extends CassandraAggregationRepository {
+    /**
+     * Aggregation repository name
+     */
+    private String name;
+    public NamedCassandraAggregationRepository() {
+        setPKColumns("NAME", "KEY");
+    }
+    public NamedCassandraAggregationRepository(Session session, String name) {
+        super(session);
+        this.name = name;
+        setPKColumns("NAME", "KEY");
+    }
+    public NamedCassandraAggregationRepository(Cluster cluster, String keyspace, String name) {
+        super(cluster, keyspace);
+        this.name = name;
+        setPKColumns("NAME", "KEY");
+    }
+
+    @Override
+    protected Object[] getPKValues() {
+        return new Object[]{name};
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
new file mode 100644
index 0000000..4bdf496
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/CassandraIdempotentRepository.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.utils.cassandra.CassandraSessionHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.camel.utils.cassandra.CassandraUtils.*;
+
+/**
+ * Implementation of {@link IdempotentRepository} using Cassandra table to store
+ * message ids.
+ * Advice: use LeveledCompaction for this table and tune read/write consistency levels.
+ * Warning: Cassandra is not the best tool for queuing use cases
+ * @see http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets
+ * @param <K> Message Id
+ */
+public abstract class CassandraIdempotentRepository<K> extends ServiceSupport implements IdempotentRepository<K> {
+    /**
+     * Logger
+     */
+    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraIdempotentRepository.class);
+    /**
+     * Session holder
+     */
+    private CassandraSessionHolder sessionHolder;
+    /**
+     * Table name
+     */
+    private String table = "CAMEL_IDEMPOTENT";
+    /**
+     * Primary key columns
+     */
+    private String[] pkColumns;
+    /**
+     * Time to live in seconds used for inserts
+     */
+    private Integer ttl;
+    /**
+     * Write consistency level
+     */
+    private ConsistencyLevel writeConsistencyLevel;
+    /**
+     * Read consistency level
+     */
+    private ConsistencyLevel readConsistencyLevel;
+    private PreparedStatement insertStatement;
+    private PreparedStatement selectStatement;
+    private PreparedStatement deleteStatement;
+
+    public CassandraIdempotentRepository() {
+    }
+
+    public CassandraIdempotentRepository(Session session) {
+        this.sessionHolder = new CassandraSessionHolder(session);
+    }
+    public CassandraIdempotentRepository(Cluster cluster, String keyspace) {
+        this.sessionHolder = new CassandraSessionHolder(cluster, keyspace);
+    }
+
+    private boolean isKey(ResultSet resultSet) {
+        Row row = resultSet.one(); 
+        if (row==null) {
+            LOGGER.debug("No row to check key");
+            return false;
+        } else {
+            LOGGER.debug("Row with {} columns to check key", row.getColumnDefinitions());            
+            return row.getColumnDefinitions().size()>1;
+        }
+    }
+    protected abstract Object[] getPKValues(K key);
+    // -------------------------------------------------------------------------
+    // Lifecycle methods
+
+    @Override
+    protected void doStart() throws Exception {
+        sessionHolder.start();
+        initInsertStatement();
+        initSelectStatement();
+        initDeleteStatement();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        sessionHolder.stop();
+    }
+    // -------------------------------------------------------------------------
+    // Add key to repository
+
+    protected void initInsertStatement() {
+        String cql = generateInsert(table, pkColumns, true, ttl).toString();
+        LOGGER.debug("Generated Insert {}", cql);
+        insertStatement = applyConsistencyLevel(getSession().prepare(cql), writeConsistencyLevel);
+    }
+
+    @Override
+    public boolean add(K key) {
+        Object[] idValues = getPKValues(key);
+        LOGGER.debug("Inserting key {}", (Object) idValues);
+        return !isKey(getSession().execute(insertStatement.bind(idValues)));
+    }
+
+    // -------------------------------------------------------------------------
+    // Check if key is in repository
+
+    protected void initSelectStatement() {
+        String cql = generateSelect(table, pkColumns, pkColumns).toString();
+        LOGGER.debug("Generated Select {}", cql);
+        selectStatement = applyConsistencyLevel(getSession().prepare(cql), readConsistencyLevel);
+    }
+
+    @Override
+    public boolean contains(K key) {
+        Object[] idValues = getPKValues(key);
+        LOGGER.debug("Checking key {}", (Object) idValues);
+        return isKey(getSession().execute(selectStatement.bind(idValues)));
+    }
+
+    @Override
+    public boolean confirm(K key) {
+        return true;
+    }
+
+    // -------------------------------------------------------------------------
+    // Remove key from repository
+
+    protected void initDeleteStatement() {
+        String cql = generateDelete(table, pkColumns, true).toString();
+        LOGGER.debug("Generated Delete {}", cql);
+        deleteStatement = applyConsistencyLevel(getSession().prepare(cql), writeConsistencyLevel);
+    }
+
+    @Override
+    public boolean remove(K key) {
+        Object[] idValues = getPKValues(key);
+        LOGGER.debug("Deleting key {}", (Object) idValues);
+        getSession().execute(deleteStatement.bind(idValues));
+        return true;
+    }
+    // -------------------------------------------------------------------------
+    // Getters & Setters
+
+    public Session getSession() {
+        return sessionHolder.getSession();
+    }
+
+    public void setSession(Session session) {
+        this.sessionHolder = new CassandraSessionHolder(session);
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String[] getPKColumns() {
+        return pkColumns;
+    }
+
+    public void setPKColumns(String... pkColumns) {
+        this.pkColumns=pkColumns;
+    }
+
+    public Integer getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(Integer ttl) {
+        this.ttl = ttl;
+    }
+
+    public ConsistencyLevel getWriteConsistencyLevel() {
+        return writeConsistencyLevel;
+    }
+
+    public void setWriteConsistencyLevel(ConsistencyLevel writeConsistencyLevel) {
+        this.writeConsistencyLevel = writeConsistencyLevel;
+    }
+
+    public ConsistencyLevel getReadConsistencyLevel() {
+        return readConsistencyLevel;
+    }
+
+    public void setReadConsistencyLevel(ConsistencyLevel readConsistencyLevel) {
+        this.readConsistencyLevel = readConsistencyLevel;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
new file mode 100644
index 0000000..03d73ec
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/idempotent/cassandra/NamedCassandraIdempotentRepository.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.idempotent.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+/**
+ * Concrete implementation of {@link  CassandraIdempotentRepository} using 2 
+ * columns as primary key: name (partition key) and key (clustering key).
+ */
+public class NamedCassandraIdempotentRepository<K> extends CassandraIdempotentRepository<K>{
+    /**
+     * Idempotent repository name
+     */
+    private String name;
+
+    public NamedCassandraIdempotentRepository() {
+        setPKColumns("NAME", "KEY");
+    }
+    public NamedCassandraIdempotentRepository(Session session, String name) {
+        super(session);
+        this.name = name;
+        setPKColumns("NAME", "KEY");
+    }
+    public NamedCassandraIdempotentRepository(Cluster cluster, String keyspace, String name) {
+        super(cluster, keyspace);
+        this.name = name;
+        setPKColumns("NAME", "KEY");
+    }
+    
+    @Override
+    protected Object[] getPKValues(K key) {
+        return new Object[]{name, key};
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraSessionHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraSessionHolder.java b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraSessionHolder.java
new file mode 100644
index 0000000..82a7a72
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraSessionHolder.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.utils.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+/**
+ * Holds a Cassandra Session and manages its lifecycle
+ */
+public class CassandraSessionHolder {
+    /**
+     * Cluster
+     */
+    private final Cluster cluster;
+    /**
+     * Session
+     */
+    private Session session;
+    /**
+     * Keyspace name
+     */
+    private String keyspace;
+    /**
+     * Indicates whether Session is externally managed
+     */
+    private final boolean managedSession;
+
+    public CassandraSessionHolder(Cluster cluster, String keyspace) {
+        this.cluster = cluster;
+        this.keyspace = keyspace;
+        this.managedSession = true;
+    }
+
+    public CassandraSessionHolder(Session session) {
+        this.cluster = session.getCluster();
+        this.session = session;
+        this.keyspace = session.getLoggedKeyspace();
+        this.managedSession = false;
+    }
+    public void start() {
+        if (managedSession) {
+            if (keyspace == null) {
+                this.session = cluster.connect();
+            } else {
+                this.session = cluster.connect(keyspace);
+            }
+        }
+    }
+    public void stop() {
+        if (!managedSession) {
+            session.close();
+            session = null;
+        }
+    }
+    public Session getSession() {
+        return session;
+    }
+
+    public Cluster getCluster() {
+        return cluster;
+    }
+
+    public String getKeyspace() {
+        return keyspace;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
new file mode 100644
index 0000000..4b18d8c
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/utils/cassandra/CassandraUtils.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.utils.cassandra;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+
+/**
+ *
+ */
+public class CassandraUtils {
+    /**
+     * Apply consistency level if provided, else leave default.
+     */
+    public static PreparedStatement applyConsistencyLevel(PreparedStatement statement, ConsistencyLevel consistencyLevel) {
+        if (consistencyLevel!=null) {
+            statement.setConsistencyLevel(consistencyLevel);
+        }
+        return statement;
+    }
+    /**
+     * Concatenate 2 arrays.
+     */
+    public static Object[] concat(Object[] array1, Object[] array2) {
+        Object[] array = new Object[array1.length+array2.length];
+        System.arraycopy(array1, 0, array, 0, array1.length);
+        System.arraycopy(array2, 0, array, array1.length, array2.length);
+        return array;
+    }
+    /**
+     * Concatenate 2 arrays.
+     */
+    public static String[] concat(String[] array1, String[] array2) {
+        String[] array = new String[array1.length+array2.length];
+        System.arraycopy(array1, 0, array, 0, array1.length);
+        System.arraycopy(array2, 0, array, array1.length, array2.length);
+        return array;
+    }
+    /**
+     * Append values to given array.
+     */
+    public static Object[] append(Object[] array1, Object ... array2) {
+        return concat(array1, array2);
+    }
+    /**
+     * Append values to given array.
+     */
+    public static String[] append(String[] array1, String ... array2) {
+        return concat(array1, array2);
+    }
+    /**
+     * Append columns to CQL.
+     */
+    public static void appendColumns(StringBuilder cqlBuilder, String[] columns, String sep, int maxColumnIndex) {
+        for (int i = 0; i < maxColumnIndex; i++) {
+            if (i > 0) {
+                cqlBuilder.append(sep);
+            }
+            cqlBuilder.append(columns[i]);
+        }
+    }
+
+    /**
+     * Append columns to CQL.
+     */
+    public static  void appendColumns(StringBuilder cqlBuilder, String[] columns, String sep) {
+        appendColumns(cqlBuilder, columns, sep, columns.length);
+    }
+
+    /**
+     * Append where columns = ? to CQL.
+     */
+    public static void appendWhere(StringBuilder cqlBuilder, String[] columns, int maxColumnIndex) {
+        cqlBuilder.append(" where ");
+        appendColumns(cqlBuilder, columns, "=? and ", maxColumnIndex);
+        cqlBuilder.append("=?");
+    }
+    /**
+     * Append where columns = ? to CQL.
+     */
+    public void appendWhere(StringBuilder cqlBuilder, String[] columns) {
+        appendWhere(cqlBuilder, columns, columns.length);
+    }
+    /**
+     * Append ?,? to CQL.
+     */
+    public static void appendPlaceholders(StringBuilder cqlBuilder, int columnCount) {
+        for (int i = 0; i < columnCount; i++) {
+            if (i > 0) {
+                cqlBuilder.append(",");
+            }
+            cqlBuilder.append("?");
+        }
+    }
+
+    /**
+     * Generate Insert CQL.
+     */
+    public static StringBuilder generateInsert(String table, String[] columns, boolean ifNotExists, Integer ttl) {
+        StringBuilder cqlBuilder = new StringBuilder("insert into ")
+                .append(table).append("(");
+        appendColumns(cqlBuilder, columns, ",");
+        cqlBuilder.append(") values (");
+        appendPlaceholders(cqlBuilder, columns.length);
+        cqlBuilder.append(")");
+        if (ifNotExists) {
+            cqlBuilder.append(" if not exists");
+        }
+        if (ttl!=null) {
+            cqlBuilder.append(" using ttl=").append(ttl);
+        }
+        return cqlBuilder;        
+    }
+    /**
+     * Generate select where columns = ? CQL.
+     */
+    public static StringBuilder generateSelect(String table, String[] selectColumns, String[] whereColumns) {
+        return generateSelect(table, selectColumns, whereColumns, whereColumns.length);
+    }
+
+    /**
+     * Generate select where columns = ? CQL.
+     */
+    public static StringBuilder generateSelect(String table, String[] selectColumns, String[] whereColumns, int whereColumnsMaxIndex) {
+        StringBuilder cqlBuilder = new StringBuilder("select ");
+        appendColumns(cqlBuilder, selectColumns, ",");
+        cqlBuilder.append(" from ").append(table);
+        appendWhere(cqlBuilder, whereColumns, whereColumnsMaxIndex);
+        return cqlBuilder;
+    }
+    /**
+     * Generate delete where columns = ? CQL.
+     */
+    public static StringBuilder generateDelete(String table, String[] whereColumns, boolean ifExists) {
+        return generateDelete(table, whereColumns, whereColumns.length, ifExists);
+    }
+    /**
+     * Generate delete where columns = ? CQL.
+     */
+    public static StringBuilder generateDelete(String table, String[] whereColumns, int whereColumnsMaxIndex, boolean ifExists) {
+        StringBuilder cqlBuilder = new StringBuilder("delete from ")
+            .append(table);
+        appendWhere(cqlBuilder, whereColumns, whereColumnsMaxIndex);
+        if (ifExists) {
+            cqlBuilder.append(" if exists");
+        }
+        return cqlBuilder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/main/resources/META-INF/services/org/apache/camel/component/cql
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/main/resources/META-INF/services/org/apache/camel/component/cql b/components/camel-cassandraql/src/main/resources/META-INF/services/org/apache/camel/component/cql
new file mode 100644
index 0000000..8d80a4f
--- /dev/null
+++ b/components/camel-cassandraql/src/main/resources/META-INF/services/org/apache/camel/component/cql
@@ -0,0 +1 @@
+class=org.apache.camel.component.cassandra.CassandraComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentBeanRefTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentBeanRefTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentBeanRefTest.java
new file mode 100644
index 0000000..f3efc27
--- /dev/null
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentBeanRefTest.java
@@ -0,0 +1,77 @@
+package org.apache.camel.component.cassandra;
+
+import org.apache.camel.component.cassandra.CassandraEndpoint;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import java.net.URLEncoder;
+import java.util.Arrays;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.cassandraunit.CassandraCQLUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CassandraComponentBeanRefTest extends CamelTestSupport {
+    @Rule
+    public CassandraCQLUnit cassandra=CassandraUnitUtils.cassandraCQLUnit();
+    @Produce(uri = "direct:input")
+    ProducerTemplate producerTemplate;
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        CassandraUnitUtils.startEmbeddedCassandra();
+    }
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        CassandraUnitUtils.cleanEmbeddedCassandra();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        Cluster cluster = Cluster.builder()
+                .addContactPoint("localhost")
+                .build();
+        registry.bind("cassandraCluster", cluster);
+        registry.bind("cassandraSession", cluster.connect("camel_ks"));
+        registry.bind("insertCql", CQL);
+        return registry;
+    }
+    public static final String CQL = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)";
+    public static final String SESSION_URI = "cql:bean:cassandraSession?cql=#insertCql";
+    public static final String CLUSTER_URI = "cql:bean:cassandraCluster/camel_ks?cql=#insertCql";
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {                
+                from("direct:inputSession")
+                  .to(SESSION_URI);
+                from("direct:inputCluster")
+                  .to(CLUSTER_URI);
+            }
+        };
+    }
+    @Test
+    public void testSession() throws Exception {
+        CassandraEndpoint endpoint = getMandatoryEndpoint(SESSION_URI, CassandraEndpoint.class);
+
+        assertEquals("camel_ks", endpoint.getKeyspace());
+        assertEquals(CQL, endpoint.getCql());
+    }
+    @Test
+    public void testCluster() throws Exception {
+        CassandraEndpoint endpoint = getMandatoryEndpoint(CLUSTER_URI, CassandraEndpoint.class);
+
+        assertEquals("camel_ks", endpoint.getKeyspace());
+        assertEquals(CQL, endpoint.getCql());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java
new file mode 100644
index 0000000..17753f7
--- /dev/null
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra;
+
+import org.apache.camel.component.cassandra.CassandraComponent;
+import com.datastax.driver.core.Cluster;
+import java.util.*;
+import org.apache.camel.impl.DefaultCamelContext;
+import static org.hamcrest.Matchers.*;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.junit.Before;
+
+/**
+ * Unit test for {@link CassandraComponent}
+ */
+public class CassandraComponentClusterBuilderTest {
+    private final CassandraComponent component = new CassandraComponent();
+    @Before
+    public void setUp() {
+        component.setCamelContext(new DefaultCamelContext());
+    }
+    @Test
+    public void testClusterBuilder_Basic() {
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("clusterName", "cluster");
+        Cluster.Builder clusterBuilder = component.clusterBuilder("127.0.0.1,127.0.0.2/keyspace", params);
+        
+        assertEquals(2, clusterBuilder.getContactPoints().size());
+        assertThat(clusterBuilder.getContactPoints().get(0).getHostName(), isOneOf("127.0.0.1","localhost"));
+        assertThat(clusterBuilder.getContactPoints().get(1).getHostName(), isOneOf("127.0.0.2","localhost"));
+        assertEquals("cluster", clusterBuilder.getClusterName());
+        assertEquals("keyspace",params.get("keyspace"));
+    }
+    @Test
+    public void testClusterBuilder_Port() {
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("clusterName", "cluster");
+        Cluster.Builder clusterBuilder = component.clusterBuilder("127.0.0.1,127.0.0.2:1234/keyspace", params);
+        
+        assertEquals(2, clusterBuilder.getContactPoints().size());
+        assertThat(clusterBuilder.getContactPoints().get(0).getHostName(), isOneOf("127.0.0.1","localhost"));
+        assertEquals(1234, clusterBuilder.getContactPoints().get(0).getPort());
+        assertThat(clusterBuilder.getContactPoints().get(1).getHostName(), isOneOf("127.0.0.2","localhost"));
+        assertEquals(1234, clusterBuilder.getConfiguration().getProtocolOptions().getPort());
+        assertEquals("cluster", clusterBuilder.getClusterName());
+        assertEquals("keyspace",params.get("keyspace"));
+    }
+
+    @Test
+    public void testClusterBuilder_Simplest() {
+        Map<String, Object> params = new HashMap<String, Object>();
+        Cluster.Builder clusterBuilder = component.clusterBuilder("127.0.0.1", params);
+        
+        assertEquals(1, clusterBuilder.getContactPoints().size());
+        assertThat(clusterBuilder.getContactPoints().get(0).getHostName(), isOneOf("127.0.0.1","localhost"));
+        assertNull(params.get("keyspace"));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7be546af/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
new file mode 100644
index 0000000..0b4f0ed
--- /dev/null
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentConsumerTest.java
@@ -0,0 +1,73 @@
+package org.apache.camel.component.cassandra;
+
+import com.datastax.driver.core.Row;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.cassandraunit.CassandraCQLUnit;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertTrue;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraComponentConsumerTest extends CamelTestSupport {
+    @Rule
+    public CassandraCQLUnit cassandra=CassandraUnitUtils.cassandraCQLUnit();
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        CassandraUnitUtils.startEmbeddedCassandra();
+    }
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        CassandraUnitUtils.cleanEmbeddedCassandra();
+    }
+    @Test
+    public void testConsume_All() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultAll");
+        mock.expectedMinimumMessageCount(1);       
+        mock.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Object body = exchange.getIn().getBody();
+                assertTrue(body instanceof List);
+            }
+        });
+        mock.await(1, TimeUnit.SECONDS);
+        assertMockEndpointsSatisfied();
+    }
+    @Test
+    public void testConsume_One() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultOne");
+        mock.expectedMinimumMessageCount(1);       
+        mock.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Object body = exchange.getIn().getBody();
+                assertTrue(body instanceof Row);
+            }
+        });
+        mock.await(1, TimeUnit.SECONDS);
+        
+        assertMockEndpointsSatisfied();
+    }
+    private static final String CQL = "select login, first_name, last_name from camel_user";
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {                
+                from("cql://localhost/camel_ks?cql="+CQL)
+                  .to("mock:resultAll");
+                from("cql://localhost/camel_ks?cql="+CQL+"&resultSetConversionStrategy=ONE")
+                  .to("mock:resultOne");
+            }
+        };
+    }
+}