You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:31:06 UTC

[2/6] storm git commit: STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector

STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2997cf9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2997cf9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2997cf9

Branch: refs/heads/master
Commit: c2997cf9f116297812505451269568484196b4c0
Parents: 6d7b968
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Sun Dec 6 22:23:29 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:19 2015 +0100

----------------------------------------------------------------------
 external/storm-cassandra/README.md              |  70 ++++++---
 .../cassandra/DynamicStatementBuilder.java      |  90 +++--------
 .../query/BaseCQLStatementTupleMapper.java      |  51 +++++++
 .../query/BatchStatementTupleMapper.java        |  57 -------
 .../cassandra/query/CQLClauseTupleMapper.java   |  36 -----
 .../cassandra/query/CQLStatementBuilder.java    |   4 +-
 .../query/CQLStatementTupleMapper.java          |  19 ---
 .../cassandra/query/CQLTableTupleMapper.java    |  39 -----
 .../cassandra/query/CQLValuesTupleMapper.java   |  74 ---------
 .../apache/storm/cassandra/query/Column.java    |  74 +++++++++
 .../apache/storm/cassandra/query/CqlMapper.java |  86 +++++++++++
 .../query/SimpleCQLStatementTupleMapper.java    |  51 -------
 .../builder/BoundCQLStatementMapperBuilder.java | 103 +++++++++++++
 .../SimpleCQLStatementMapperBuilder.java        |  91 +++++++++++
 .../impl/BatchCQLStatementTupleMapper.java      |  58 +++++++
 .../impl/BoundCQLStatementTupleMapper.java      | 106 +++++++++++++
 .../query/impl/BoundStatementMapperBuilder.java | 109 -------------
 .../query/impl/InsertStatementBuilder.java      | 153 -------------------
 .../query/impl/PreparedStatementBinder.java     | 136 +++++++++++++++++
 .../query/impl/RoutingKeyGenerator.java         |  52 +++++++
 .../query/impl/SimpleCQLStatementMapper.java    |  88 +++++++++++
 .../query/impl/UpdateStatementBuilder.java      | 118 --------------
 .../cassandra/query/selector/FieldSelector.java |   6 +-
 .../cassandra/DynamicStatementBuilderTest.java  | 108 +++++++------
 .../bolt/BatchCassandraWriterBoltTest.java      |  10 +-
 .../cassandra/bolt/CassandraWriterBoltTest.java |  10 +-
 26 files changed, 991 insertions(+), 808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 943fe5e..1edf708 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -39,17 +39,24 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```java
 
     new CassandraWriterBolt(
-        insertInto("album")
-            .values(
-                with(fields("title", "year", "performer", "genre", "tracks")
-                ).build());
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with(
+                    fields("title", "year", "performer", "genre", "tracks")
+                 )
+            )
+    );
 ```
+
 ##### Insert query including all tuple fields.
 ```java
 
     new CassandraWriterBolt(
-        insertInto("album")
-            .values(all()).build());
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with( all() )
+            )
+    );
 ```
 
 ##### Insert multiple queries from one input tuple.
@@ -57,32 +64,49 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 
     new CassandraWriterBolt(
         async(
-        insertInto("titles_per_album").values(all()),
-        insertInto("titles_per_performer").values(all())
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
         )
     );
 ```
 
-##### Insert query including some fields with aliases
+##### Insert query using QueryBuilder
 ```java
 
     new CassandraWriterBolt(
-        insertInto("album")
-            .values(
-                with(field("ti"),as("title",
-                     field("ye").as("year")),
-                     field("pe").as("performer")),
-                     field("ge").as("genre")),
-                     field("tr").as("tracks")),
-                     ).build());
+        async(
+            simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with(all()))
+            )
+    )
 ```
 
 ##### Insert query with static bound query
 ```java
 
     new CassandraWriterBolt(
-         boundQuery("INSERT INTO album (\"title\", \"year\", \"performer\", \"genre\", \"tracks\") VALUES(?, ?, ?, ?, ?);")
-            .bind(all());
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .bind(all());
+         )
+    );
+```
+
+##### Insert query with static bound query using named setters and aliases
+```java
+
+    new CassandraWriterBolt(
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
+                .bind(
+                    field("ti"),as("title"),
+                    field("ye").as("year")),
+                    field("pe").as("performer")),
+                    field("ge").as("genre")),
+                    field("tr").as("tracks"))
+                ).byNamedSetters()
+         )
+    );
 ```
 
 ##### Insert query with bound statement load from storm configuration
@@ -106,14 +130,14 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 
     // Logged
     new CassandraWriterBolt(loggedBatch(
-        insertInto("title_per_album").values(all())
-        insertInto("title_per_performer").values(all())
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
         )
     );
 // UnLogged
     new CassandraWriterBolt(unLoggedBatch(
-        insertInto("title_per_album").values(all())
-        insertInto("title_per_performer").values(all())
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
         )
     );
 ```

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
index deea8da..ce53ea3 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -19,10 +19,11 @@
 package org.apache.storm.cassandra;
 
 import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
 import org.apache.storm.cassandra.query.*;
-import org.apache.storm.cassandra.query.impl.BoundStatementMapperBuilder;
-import org.apache.storm.cassandra.query.impl.InsertStatementBuilder;
-import org.apache.storm.cassandra.query.impl.UpdateStatementBuilder;
+import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder;
+import org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder;
 import org.apache.storm.cassandra.query.selector.FieldSelector;
 
 import java.io.Serializable;
@@ -33,73 +34,32 @@ public class DynamicStatementBuilder implements Serializable {
     private DynamicStatementBuilder() {
     }
 
-    /**
-     * Builds a new insert statement for the specified table.
-     *
-     * @param table the table's name.
-     * @return a new {@link InsertStatementBuilder} instance.
-     */
-    public static final InsertStatementBuilder insertInto(String table) {
-        return new InsertStatementBuilder(table);
-    }
-    /**
-     * Builds a new insert statement based on the specified CQL mapper.
-     *
-     * @param mapper the CQL mapper.
-     * @return a new {@link InsertStatementBuilder} instance.
-     */
-    public static final InsertStatementBuilder insertInto(CQLTableTupleMapper mapper) {
-        return new InsertStatementBuilder(mapper);
-    }
-    /**
-     * Builds a new insert statement for the specified keyspace and table.
-     *
-     * @param ks the keyspace to use.
-     * @param table the table's name.
-     * @return a new {@link InsertStatementBuilder} instance.
-     */
-    public static final InsertStatementBuilder insertInto(String ks, String table) {
-        return new InsertStatementBuilder(table, ks);
+    public static final SimpleCQLStatementMapperBuilder simpleQuery(String queryString) {
+        return new SimpleCQLStatementMapperBuilder(queryString);
     }
 
-    /**
-     * Builds a new update statement for the specified table.
-     *
-     * @param table the table's name.
-     * @return a new {@link UpdateStatementBuilder} instance.
-     */
-    public static final UpdateStatementBuilder update(String table) {
-        return new UpdateStatementBuilder(table);
-    }
-
-    /**
-     * Builds a new update statement for the specified keyspace and table.
-     *
-     * @param table the table's name.
-     * @return a new {@link UpdateStatementBuilder} instance.
-     */
-    public static final UpdateStatementBuilder update(String ks, String table) {
-        return new UpdateStatementBuilder(table, ks);
+    public static final SimpleCQLStatementMapperBuilder simpleQuery(BuiltStatement builtStatement) {
+        return new SimpleCQLStatementMapperBuilder(builtStatement);
     }
 
     /**
      * Builds a new bound statement based on the specified query.
      *
      * @param cql the query.
-     * @return a new {@link BoundStatementMapperBuilder} instance.
+     * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
      */
-    public static final BoundStatementMapperBuilder boundQuery(String cql) {
-        return new BoundStatementMapperBuilder(cql);
+    public static final BoundCQLStatementMapperBuilder boundQuery(String cql) {
+        return new BoundCQLStatementMapperBuilder(cql);
     }
 
     /**
      * Builds a new bound statement identified by the given field.
      *
      * @param field a context used to resolve the cassandra query.
-     * @return a new {@link BoundStatementMapperBuilder} instance.
+     * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
      */
-    public static final BoundStatementMapperBuilder boundQuery(ContextQuery field) {
-        return new BoundStatementMapperBuilder(field);
+    public static final BoundCQLStatementMapperBuilder boundQuery(ContextQuery field) {
+        return new BoundCQLStatementMapperBuilder(field);
     }
 
     /**
@@ -115,27 +75,27 @@ public class DynamicStatementBuilder implements Serializable {
     /**
      * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#LOGGED} batch statement for the specified CQL statement builders.
      */
-    public static final BatchStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
+    public static final BatchCQLStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders);
     }
     /**
      * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders.
      */
-    public static final BatchStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
+    public static final BatchCQLStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders);
     }
     /**
      * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders.
      */
-    public static final BatchStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
+    public static final BatchCQLStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.UNLOGGED, builders);
     }
 
-    private static BatchStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
+    private static BatchCQLStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
         List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length);
         for(CQLStatementBuilder b : Arrays.asList(builders))
             mappers.add(b.build());
-        return new BatchStatementTupleMapper(type, mappers);
+        return new BatchCQLStatementTupleMapper(type, mappers);
     }
 
     /**
@@ -181,19 +141,11 @@ public class DynamicStatementBuilder implements Serializable {
         return fl.toArray(new FieldSelector[size]);
     }
 
-    /**
-     * Includes only the specified tuple fields.
-     *
-     * @param fields a list of field selector.
-     */
-    public static final CQLValuesTupleMapper with(final FieldSelector... fields) {
-        return new CQLValuesTupleMapper.WithFieldTupleMapper(Arrays.asList(fields));
-    }
 
     /**
      * Includes all tuple fields.
      */
-    public static final CQLValuesTupleMapper all() {
-        return new CQLValuesTupleMapper.AllTupleMapper();
+    public static final CqlMapper.DefaultCqlMapper all() {
+        return new CqlMapper.DefaultCqlMapper();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
new file mode 100644
index 0000000..c9ba6fa
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
+ *
+ */
+public abstract class BaseCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        return Arrays.asList(map(tuple));
+    }
+
+    /**
+     * Maps a given tuple to a single CQL statements.
+     *
+     * @param tuple the incoming tuple to map.
+     * @return a list of {@link com.datastax.driver.core.Statement}.
+     */
+    public abstract Statement map(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
deleted file mode 100644
index 32ff1de..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-
-public class BatchStatementTupleMapper implements CQLStatementTupleMapper {
-
-    private final List<CQLStatementTupleMapper> mappers;
-    private final BatchStatement.Type type;
-
-    /**
-     * Creates a new {@link BatchStatementTupleMapper} instance.
-     * @param type
-     * @param mappers
-     */
-    public BatchStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
-        this.mappers = new ArrayList<>(mappers);
-        this.type = type;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Statement> map(Map conf, Session session, ITuple tuple) {
-        final BatchStatement batch = new BatchStatement(this.type);
-        for(CQLStatementTupleMapper m : mappers)
-            batch.addAll(m.map(conf, session, tuple));
-        return Arrays.asList((Statement)batch);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
deleted file mode 100644
index baa4685..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.querybuilder.Clause;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to
- * a list of cassandra {@link com.datastax.driver.core.querybuilder.Clause}s.
- *
- */
-public interface CQLClauseTupleMapper extends Serializable {
-
-    List<Clause> map(ITuple tuple);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
index 6c04d1c..9ddb58a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
@@ -21,11 +21,11 @@ package org.apache.storm.cassandra.query;
 import java.io.Serializable;
 
 
-public interface CQLStatementBuilder extends Serializable {
+public interface CQLStatementBuilder<T extends CQLStatementTupleMapper> extends Serializable {
 
     /**
      * Builds a new {@link CQLStatementTupleMapper} instance.
      * @return a new CQLStatementMapper instance.
      */
-    <T extends CQLStatementTupleMapper> T build();
+    T build();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
index 32c86dd..b100d9a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
@@ -38,10 +38,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
  */
 public interface CQLStatementTupleMapper extends Serializable {
 
-    public static final String FIELD_KEYSPACE =  "keyspace";
-    public static final String FIELD_TABLE    =  "table";
-    public static final String FIELD_VALUES   =  "value";
-
     /**
      * Maps a given tuple to one or multiple CQL statements.
      *
@@ -52,21 +48,6 @@ public interface CQLStatementTupleMapper extends Serializable {
      */
     List<Statement> map(Map conf, Session session, ITuple tuple);
 
-    public static class InsertCQLStatementTupleMapper implements CQLStatementTupleMapper {
-        @Override
-        public List<Statement> map(Map conf, Session session, ITuple tuple) {
-            Optional<String> ks = Optional.fromNullable(tuple.contains(FIELD_KEYSPACE) ? tuple.getStringByField(FIELD_KEYSPACE) : null);
-            String table = tuple.getStringByField(FIELD_TABLE);
-            Map<String, Object> values = (Map<String, Object>) tuple.getValueByField(FIELD_VALUES);
-
-            final Insert stmt = (ks.isPresent()) ? insertInto(ks.get(), table) : insertInto(table);
-            for(Map.Entry<String, Object> v : values.entrySet())
-                stmt.value(v.getKey(), v.getValue());
-
-            return Arrays.asList((Statement)stmt);
-        }
-    }
-
     public static class DynamicCQLStatementTupleMapper implements CQLStatementTupleMapper {
         private List<CQLStatementBuilder> builders;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
deleted file mode 100644
index 574eac9..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-
-import java.io.Serializable;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to a table name.
- *
- */
-public interface CQLTableTupleMapper extends Serializable {
-
-    /**
-     * Returns a table's name from the specified tuple.
-     *
-     * @param tuple the incoming tuple.
-     * @return the table name.
-     */
-    String map(ITuple tuple);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
deleted file mode 100644
index 2cb8e4c..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import org.apache.storm.cassandra.query.selector.FieldSelector;
-
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to Map of values of a CQL statement.
- *
- */
-public interface CQLValuesTupleMapper extends Serializable {
-
-    /**
-     * Map the specified {@code tuple} to values of CQL statement(s).
-     * @param tuple the incoming tuple to map.
-     * @return values of CQL statement(s)
-     */
-    Map<String, Object> map(ITuple tuple);
-
-    /**
-     * Default {@link CQLValuesTupleMapper} implementation to get specifics tuple's fields.
-     */
-    public static class WithFieldTupleMapper implements CQLValuesTupleMapper {
-        private List<FieldSelector> fields;
-
-        public WithFieldTupleMapper(List<FieldSelector> fields) {
-            this.fields = fields;
-        }
-
-        @Override
-        public Map<String, Object> map(ITuple tuple) {
-            Map<String, Object> ret = new LinkedHashMap<>();
-            for(FieldSelector fs : fields)
-                fs.selectAndPut(tuple, ret);
-            return ret;
-        }
-    }
-
-    /**
-     * Default {@link CQLValuesTupleMapper} implementation to get all tuple's fields.
-     */
-    public static class AllTupleMapper implements CQLValuesTupleMapper {
-        @Override
-        public Map<String, Object> map(ITuple tuple) {
-            Map<String, Object> ret = new LinkedHashMap<>();
-            for(String name  : tuple.getFields())
-                ret.put(name, tuple.getValueByField(name));
-            return ret;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
new file mode 100644
index 0000000..06643a5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class Column<T> implements Serializable {
+
+    private final String columnName;
+    private final T val;
+
+    public Column(String columnName, T val) {
+        this.columnName = columnName;
+        this.val = val;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public T getVal() {
+        return val;
+    }
+
+    public boolean isNull() { return val == null;}
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Column column = (Column) o;
+
+        if (columnName != null ? !columnName.equals(column.columnName) : column.columnName != null) return false;
+        if (val != null ? !val.equals(column.val) : column.val != null) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = columnName != null ? columnName.hashCode() : 0;
+        result = 31 * result + (val != null ? val.hashCode() : 0);
+        return result;
+    }
+
+    public static Object[] getVals(List<Column> columns) {
+        List<Object> vals = new ArrayList<>(columns.size());
+        for(Column c : columns)
+            vals.add(c.getVal());
+        return vals.toArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
new file mode 100644
index 0000000..2ab8f92
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Default interface to defines how a storm tuple maps to a list of columns representing a row in a database.
+ */
+public interface CqlMapper extends Serializable {
+
+    /**
+     * Maps the specified input tuple to a list of CQL columns.
+     * @param tuple the input tuple.
+     * @return the list of
+     */
+    List<Column> map(ITuple tuple);
+
+
+    public static final class SelectableCqlMapper implements CqlMapper {
+
+        private final List<FieldSelector> selectors;
+
+        /**
+         * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+         * @param selectors list of selectors used to extract column values from tuple.
+         */
+        public SelectableCqlMapper(List<FieldSelector> selectors) {
+            this.selectors = selectors;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public List<Column> map(ITuple tuple) {
+            List<Column> columns = new ArrayList<>(selectors.size());
+            for(FieldSelector selector : selectors)
+                columns.add(selector.select(tuple));
+            return columns;
+        }
+    }
+
+    /**
+     * Default {@link CqlMapper} to map all tuple values to column.
+     */
+    public static final class DefaultCqlMapper implements CqlMapper {
+
+        /**
+         * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+         */
+        public DefaultCqlMapper() {}
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public List<Column> map(ITuple tuple) {
+            List<Column> columns = new ArrayList<>(tuple.size());
+            for(String name  : tuple.getFields())
+                columns.add(new Column(name, tuple.getValueByField(name)));
+            return columns;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
deleted file mode 100644
index 683824e..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
- *
- */
-public abstract class SimpleCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Statement> map(Map conf, Session session, ITuple tuple) {
-        return Arrays.asList(map(tuple));
-    }
-
-    /**
-     * Maps a given tuple to a single CQL statements.
-     *
-     * @param tuple the incoming tuple to map.
-     * @return a list of {@link com.datastax.driver.core.Statement}.
-     */
-    public abstract Statement map(ITuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..52502b0
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.BoundCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.PreparedStatementBinder;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.*;
+
+import static org.apache.storm.cassandra.query.ContextQuery.*;
+
+public class BoundCQLStatementMapperBuilder implements CQLStatementBuilder<BoundCQLStatementTupleMapper>,  Serializable {
+
+    private final ContextQuery contextQuery;
+
+    private CqlMapper mapper;
+
+    private List<String> routingKeys;
+
+    private PreparedStatementBinder binder;
+
+    /**
+     * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+     * @param cql
+     */
+    public BoundCQLStatementMapperBuilder(String cql) {
+        this.contextQuery = new StaticContextQuery(cql);
+    }
+
+    /**
+     * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+     * @param contextQuery
+     */
+    public BoundCQLStatementMapperBuilder(ContextQuery contextQuery) {
+        this.contextQuery = contextQuery;
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param fields a list of field selector.
+     */
+    public final BoundCQLStatementMapperBuilder bind(final FieldSelector... fields) {
+        this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+        return this;
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param mapper a column mapper.
+     */
+    public final BoundCQLStatementMapperBuilder bind(CqlMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public final BoundCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+        this.routingKeys = Arrays.asList(fields);
+        return this;
+    }
+
+    public final BoundCQLStatementMapperBuilder byNamedSetters() {
+        this.binder = new PreparedStatementBinder.CQL3NamedSettersBinder();
+        return this;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public BoundCQLStatementTupleMapper build() {
+        return new BoundCQLStatementTupleMapper(contextQuery, mapper, getRoutingKeyGenerator(), getStatementBinder());
+    }
+
+    private RoutingKeyGenerator getRoutingKeyGenerator() {
+        return (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null;
+    }
+
+    private PreparedStatementBinder getStatementBinder() {
+        return (binder != null) ? binder : new PreparedStatementBinder.DefaultBinder();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..401f1bf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Default class to build {@link org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper} instance.
+ */
+public class SimpleCQLStatementMapperBuilder implements CQLStatementBuilder<SimpleCQLStatementMapper>, Serializable {
+
+    private final String queryString;
+
+    private CqlMapper mapper;
+
+    private List<String> routingKeys;
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+     * @param queryString a valid CQL query string.
+     */
+    public SimpleCQLStatementMapperBuilder(String queryString) {
+        this.queryString = queryString;
+    }
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+     * @param builtStatement a query built statements
+     */
+    public SimpleCQLStatementMapperBuilder(BuiltStatement builtStatement) {
+        this.queryString = builtStatement.getQueryString();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SimpleCQLStatementMapper build() {
+        return new SimpleCQLStatementMapper(queryString, mapper, (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null );
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param fields a list of field selector.
+     */
+    public final SimpleCQLStatementMapperBuilder with(final FieldSelector... fields) {
+        this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+        return this;
+    }
+
+    public final SimpleCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+        this.routingKeys = Arrays.asList(fields);
+        return this;
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param mapper a column mapper.
+     */
+    public final SimpleCQLStatementMapperBuilder with(CqlMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
new file mode 100644
index 0000000..fe948f5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+public class BatchCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+    private final List<CQLStatementTupleMapper> mappers;
+    private final BatchStatement.Type type;
+
+    /**
+     * Creates a new {@link BatchCQLStatementTupleMapper} instance.
+     * @param type
+     * @param mappers
+     */
+    public BatchCQLStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
+        this.mappers = new ArrayList<>(mappers);
+        this.type = type;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        final BatchStatement batch = new BatchStatement(this.type);
+        for(CQLStatementTupleMapper m : mappers)
+            batch.addAll(m.map(conf, session, tuple));
+        return Arrays.asList((Statement)batch);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
new file mode 100644
index 0000000..8cce418
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+import org.apache.storm.cassandra.query.ContextQuery;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+    private final ContextQuery contextQuery;
+
+    private final CqlMapper mapper;
+
+    private Map<String, PreparedStatement> cache = new HashMap<>();
+
+    private final RoutingKeyGenerator rkGenerator;
+
+    private final PreparedStatementBinder binder;
+
+    /**
+     * Creates a new {@link BoundCQLStatementTupleMapper} instance.
+     *
+     * @param contextQuery
+     * @param mapper
+     * @param mapper
+     * @param rkGenerator
+     * @param binder
+     */
+    public BoundCQLStatementTupleMapper(ContextQuery contextQuery, CqlMapper mapper, RoutingKeyGenerator rkGenerator, PreparedStatementBinder binder) {
+        Preconditions.checkNotNull(contextQuery, "ContextQuery must not be null");
+        Preconditions.checkNotNull(mapper, "Mapper should not be null");
+        this.contextQuery = contextQuery;
+        this.mapper = mapper;
+        this.rkGenerator = rkGenerator;
+        this.binder = binder;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map config, Session session, ITuple tuple) {
+        final List<Column> columns = mapper.map(tuple);
+
+        final String query = contextQuery.resolves(config, tuple);
+
+        PreparedStatement statement = getPreparedStatement(session, query);
+        if(hasRoutingKeys()) {
+            List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+            if( keys.size() == 1)
+                statement.setRoutingKey(keys.get(0));
+            else
+                statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+        }
+
+        return Arrays.asList((Statement) this.binder.apply(statement, columns));
+    }
+
+    private boolean hasRoutingKeys() {
+        return rkGenerator != null;
+    }
+
+    /**
+     * Get or prepare a statement using the specified session and the query.
+     * *
+     * @param session The cassandra session.
+     * @param query The CQL query to prepare.
+     */
+    private PreparedStatement getPreparedStatement(Session session, String query) {
+        PreparedStatement statement = cache.get(query);
+        if( statement == null) {
+            statement = session.prepare(query);
+            cache.put(query, statement);
+        }
+        return statement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
deleted file mode 100644
index b7ded14..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.ContextQuery;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery;
-
-
-public class BoundStatementMapperBuilder implements Serializable {
-    private final ContextQuery contextQuery;
-
-    /**
-     * Creates a new {@link BoundStatementMapperBuilder} instance.
-     * @param cql
-     */
-    public BoundStatementMapperBuilder(String cql) {
-        this.contextQuery = new StaticContextQuery(cql);
-    }
-
-    /**
-     * Creates a new {@link BoundStatementMapperBuilder} instance.
-     * @param contextQuery
-     */
-    public BoundStatementMapperBuilder(ContextQuery contextQuery) {
-        this.contextQuery = contextQuery;
-    }
-
-    public CQLStatementTupleMapper bind(final CQLValuesTupleMapper mapper) {
-        return new CQLBoundStatementTupleMapper(contextQuery, mapper);
-    }
-
-    public static class CQLBoundStatementTupleMapper implements CQLStatementTupleMapper {
-
-        private final ContextQuery contextQuery;
-
-        private final CQLValuesTupleMapper mapper;
-
-        // should be a lru-map or weakhashmap else this may lead to memory leaks.
-        private Map<String, PreparedStatement> cache = new HashMap<>();
-
-        /**
-         * Creates a new {@link CQLBoundStatementTupleMapper} instance.
-         *
-         * @param contextQuery
-         * @param mapper
-         */
-        CQLBoundStatementTupleMapper(ContextQuery contextQuery, CQLValuesTupleMapper mapper) {
-            this.contextQuery = contextQuery;
-            this.mapper = mapper;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public List<Statement> map(Map config, Session session, ITuple tuple) {
-            final Map<String, Object> values = mapper.map(tuple);
-            final String query = contextQuery.resolves(config, tuple);
-            Object[] objects = values.values().toArray(new Object[values.size()]);
-            PreparedStatement statement = getPreparedStatement(session, query);
-            // todo bind objects in the same sequence as the statement expects.
-            return Arrays.asList((Statement) statement.bind(objects));
-        }
-
-        /**
-         * Get or prepare a statement using the specified session and the query.
-         * *
-         * @param session The cassandra session.
-         * @param query The CQL query to prepare.
-         */
-        private PreparedStatement getPreparedStatement(Session session, String query) {
-            PreparedStatement statement = cache.get(query);
-            if (statement == null) {
-                statement = session.prepare(query);
-                cache.put(query, statement);
-            }
-            return statement;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
deleted file mode 100644
index 8eddb04..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.base.Preconditions;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLTableTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class InsertStatementBuilder<T extends CQLValuesTupleMapper> implements CQLStatementBuilder {
-    /**
-     * Optional name of the table into insert.
-     */
-    protected final String table;
-    /**
-     * Optional keyspace name to insert.
-     */
-    protected final String keyspace;
-    /**
-     * Optional mapper to retrieve a table name from tuple.
-     */
-    protected final CQLTableTupleMapper mapper2TableName;
-
-    protected T valuesMapper;
-
-    protected boolean ifNotExist;
-
-    protected Integer ttlInSeconds;
-
-    protected ConsistencyLevel level;
-
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param table    name of the table into insert.
-     */
-    public InsertStatementBuilder(String table) {
-        this(null, table, null);
-    }
-
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param table    name of the table into insert.
-     * @param keyspace the keyspace's name to which the table belongs
-     */
-    public InsertStatementBuilder(String table, String keyspace) {
-        this(keyspace, table, null);
-    }
-
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param mapper2TableName the mapper that specify the table in which insert
-     * @param keyspace         the name of the keyspace to which the table belongs
-     */
-    public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName, String keyspace) {
-        this(keyspace, null, mapper2TableName);
-    }
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param mapper2TableName the mapper that specify the table in which insert
-     */
-    public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName) {
-        this(null, null, mapper2TableName);
-    }
-
-    private InsertStatementBuilder(String keyspace, String table, CQLTableTupleMapper mapper2TableName) {
-        this.keyspace = keyspace;
-        this.table = table;
-        this.mapper2TableName = mapper2TableName;
-    }
-
-    /**
-     * Adds "IF NOT EXISTS" clause to the statement.
-     * @see com.datastax.driver.core.querybuilder.Insert#ifNotExists()
-     */
-    public InsertStatementBuilder ifNotExists() {
-        this.ifNotExist = true;
-        return this;
-    }
-
-    public InsertStatementBuilder usingTTL(Long time, TimeUnit unit) {
-        this.ttlInSeconds = (int) unit.toSeconds(time);
-        return this;
-    }
-
-    /**
-     * Sets the consistency level used for this statement.
-     *
-     * @param level a ConsistencyLevel.
-     */
-    public InsertStatementBuilder withConsistencyLevel(ConsistencyLevel level) {
-        this.level = level;
-        return this;
-    }
-
-    /**
-     * Maps tuple to values.
-     *
-     * @see com.datastax.driver.core.querybuilder.Insert#value(String, Object)
-     */
-    public InsertStatementBuilder values(final T valuesMapper) {
-        this.valuesMapper = valuesMapper;
-        return this;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public SimpleCQLStatementTupleMapper build() {
-        Preconditions.checkState(null != table, "Table name must not be null");
-        return new SimpleCQLStatementTupleMapper() {
-            @Override
-            public Statement map(ITuple tuple) {
-                Insert stmt = QueryBuilder.insertInto(keyspace, table);
-                for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
-                    stmt.value(entry.getKey(), entry.getValue());
-                if (ttlInSeconds != null) stmt.using(QueryBuilder.ttl(ttlInSeconds));
-                if (ifNotExist) stmt.ifNotExists();
-                if (level != null) stmt.setConsistencyLevel(level);
-                return stmt;
-            }
-        };
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
new file mode 100644
index 0000000..7a6c78d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import com.datastax.driver.core.*;
+import org.apache.storm.cassandra.query.Column;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ *
+ */
+public interface PreparedStatementBinder extends Serializable {
+
+    public BoundStatement apply(PreparedStatement statement, List<Column> columns);
+
+    public static final class DefaultBinder implements PreparedStatementBinder {
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+            Object[] values = Column.getVals(columns);
+            return statement.bind(values);
+        }
+    }
+
+    public static final class CQL3NamedSettersBinder implements PreparedStatementBinder {
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+            Object[] values = Column.getVals(columns);
+
+            BoundStatement boundStatement = statement.bind();
+            for(Column col : columns) {
+                // For native protocol V3 or below, all variables must be bound.
+                // With native protocol V4 or above, variables can be left unset,
+                // in which case they will be ignored server side (no tombstones will be generated).
+                if(col.isNull()) {
+                    boundStatement.setToNull(col.getColumnName());
+                } else {
+                    bind(boundStatement, col.getColumnName(), col.getVal());
+                }
+            }
+            return statement.bind(values);
+        }
+
+        /**
+         * This ugly method comes from {@link com.datastax.driver.core.TypeCodec#getDataTypeFor(Object)}.
+         */
+        static void bind(BoundStatement statement, String name, Object value) {
+            // Starts with ByteBuffer, so that if already serialized value are provided, we don't have the
+            // cost of testing a bunch of other types first
+            if (value instanceof ByteBuffer)
+                statement.setBytes(name, (ByteBuffer)value);
+
+            if (value instanceof Number) {
+                if (value instanceof Integer)
+                    statement.setInt(name, (Integer)value);
+                if (value instanceof Long)
+                    statement.setLong(name, (Long) value);
+                if (value instanceof Float)
+                    statement.setFloat(name, (Float) value);
+                if (value instanceof Double)
+                    statement.setDouble(name, (Double)value);
+                if (value instanceof BigDecimal)
+                    statement.setDecimal(name, (BigDecimal)value);
+                if (value instanceof BigInteger)
+                    statement.setVarint(name, (BigInteger)value);
+                throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+            }
+
+            if (value instanceof String)
+                statement.setString(name, (String)value);
+
+            if (value instanceof Boolean)
+                statement.setBool(name, (Boolean)value);
+
+            if (value instanceof InetAddress)
+                statement.setInet(name, (InetAddress)value);
+
+            if (value instanceof Date)
+                statement.setDate(name, (Date)value);
+
+            if (value instanceof UUID)
+                statement.setUUID(name, (UUID)value);
+
+            if (value instanceof List) {
+                statement.setList(name, (List)value);
+            }
+
+            if (value instanceof Set) {
+                statement.setSet(name, (Set)value);
+            }
+
+            if (value instanceof Map) {
+                statement.setMap(name, (Map) value);
+            }
+
+            if (value instanceof UDTValue) {
+                statement.setUDTValue(name, (UDTValue)value);
+            }
+
+            if (value instanceof TupleValue) {
+                statement.setTupleValue(name, (TupleValue) value);
+            }
+
+            throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
new file mode 100644
index 0000000..3f4f47e
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RoutingKeyGenerator implements Serializable {
+
+    private List<String> routingKeys;
+
+    /**
+     * Creates a new {@link RoutingKeyGenerator} instance.
+     * @param routingKeys
+     */
+    public RoutingKeyGenerator(List<String> routingKeys) {
+        Preconditions.checkNotNull(routingKeys);
+        this.routingKeys = routingKeys;
+    }
+
+    public List<ByteBuffer> getRoutingKeys(ITuple tuple) {
+        List<ByteBuffer> keys = new ArrayList<>(routingKeys.size());
+        for(String s : routingKeys) {
+            Object value = tuple.getValueByField(s);
+            keys.add(DataType.serializeValue(value, ProtocolVersion.NEWEST_SUPPORTED));
+        }
+        return keys;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
new file mode 100644
index 0000000..ab9adbf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class SimpleCQLStatementMapper implements CQLStatementTupleMapper {
+
+    private final String queryString;
+    private final CqlMapper mapper;
+    private final RoutingKeyGenerator rkGenerator;
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapper} instance.
+     * @param queryString the cql query string to execute.
+     * @param mapper the mapper.
+     */
+    public SimpleCQLStatementMapper(String queryString, CqlMapper mapper) {
+        this(queryString, mapper, null);
+    }
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapper} instance.
+     * @param queryString the cql query string to execute.
+     * @param mapper the mapper.
+     */
+    public SimpleCQLStatementMapper(String queryString, CqlMapper mapper, RoutingKeyGenerator rkGenerator) {
+        Preconditions.checkNotNull(queryString, "Query string must not be null");
+        Preconditions.checkNotNull(mapper, "Mapper should not be null");
+        this.queryString = queryString;
+        this.mapper = mapper;
+        this.rkGenerator = rkGenerator;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        List<Column> columns = mapper.map(tuple);
+        SimpleStatement statement = new SimpleStatement(queryString, Column.getVals(columns));
+
+        if(hasRoutingKeys()) {
+            List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+            if( keys.size() == 1)
+                statement.setRoutingKey(keys.get(0));
+            else
+                statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+        }
+
+        return Arrays.asList((Statement) statement);
+    }
+
+    private boolean hasRoutingKeys() {
+        return rkGenerator != null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
deleted file mode 100644
index 70696ab..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Clause;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Update;
-import org.apache.storm.cassandra.query.CQLClauseTupleMapper;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-
-import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
-
-public final class UpdateStatementBuilder implements CQLStatementBuilder {
-
-    /**
-     * The name of table to update. 
-     */
-    private final String table;
-    /**
-     * The keyspace of the table.
-     */
-    private final String keyspace;
-
-    private CQLValuesTupleMapper valuesMapper;
-
-    private CQLClauseTupleMapper clausesMapper;
-
-    private CQLClauseTupleMapper onlyIfClausesMapper;
-    /**
-     * Creates a new {@link UpdateStatementBuilder} instance.
-     * @param table the name of table to update.
-     */
-    public UpdateStatementBuilder(String table) {
-        this(table, null);
-    }
-
-    /**
-     * Creates a new {@link UpdateStatementBuilder} instance.
-     * @param table the name of table to update.
-     * @param keyspace the keyspace of the table.
-     */
-    public UpdateStatementBuilder(String table, String keyspace) {
-        this.table = table;
-        this.keyspace = keyspace;
-    }
-
-    /**
-     * Maps output tuple to values.
-     * @see com.datastax.driver.core.querybuilder.Update#with(com.datastax.driver.core.querybuilder.Assignment)
-     */
-    public UpdateStatementBuilder with(final CQLValuesTupleMapper values) {
-        this.valuesMapper = values;
-        return this;
-    }
-
-    /**
-     * Maps output tuple to some Where clauses.
-     * @see com.datastax.driver.core.querybuilder.Update#where(com.datastax.driver.core.querybuilder.Clause)
-     */
-    public UpdateStatementBuilder where(final CQLClauseTupleMapper clauses) {
-        this.clausesMapper = clauses;
-        return this;
-    }
-
-    /**
-     * Maps output tuple to some If clauses.
-     * @see com.datastax.driver.core.querybuilder.Update#onlyIf(com.datastax.driver.core.querybuilder.Clause)
-     */
-    public UpdateStatementBuilder onlyIf(final CQLClauseTupleMapper clauses) {
-        this.onlyIfClausesMapper = clauses;
-        return this;
-    }
-
-    @Override
-    public SimpleCQLStatementTupleMapper build() {
-        return new SimpleCQLStatementTupleMapper() {
-            @Override
-            public Statement map(ITuple tuple) {
-                Update stmt = QueryBuilder.update(keyspace, table);
-                for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
-                    stmt.with(set(entry.getKey(), entry.getValue()));
-                for(Clause clause : clausesMapper.map(tuple))
-                    stmt.where(clause);
-                if( hasIfClauses())
-                    for(Clause clause : onlyIfClausesMapper.map(tuple)) {
-                        stmt.onlyIf(clause);
-                    }
-                return stmt;
-            }
-        };
-    }
-
-    private boolean hasIfClauses() {
-        return onlyIfClausesMapper != null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
index b68138a..8ce651e 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
@@ -20,6 +20,7 @@ package org.apache.storm.cassandra.query.selector;
 
 import backtype.storm.tuple.ITuple;
 import com.datastax.driver.core.utils.UUIDs;
+import org.apache.storm.cassandra.query.Column;
 
 import java.io.Serializable;
 import java.util.Map;
@@ -41,9 +42,8 @@ public class FieldSelector implements Serializable {
         this.field = field;
     }
 
-    public void selectAndPut(ITuple t, Map<String, Object> values) {
-        values.put(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
-
+    public Column select(ITuple t) {
+        return new Column<>(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
     }
 
     /**