You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:31:05 UTC

[1/6] storm git commit: STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector

Repository: storm
Updated Branches:
  refs/heads/master df32755b1 -> 989cae6ec


http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
index 843bf6b..2593327 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -21,9 +21,15 @@ package org.apache.storm.cassandra;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.common.collect.Maps;
 import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
 import org.cassandraunit.CassandraCQLUnit;
 import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
 import org.junit.Assert;
@@ -31,20 +37,18 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 
+import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.async;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.fields;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.loggedBatch;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.unLoggedBatch;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
 import static org.mockito.Mockito.when;
 
 public class DynamicStatementBuilderTest {
@@ -64,78 +68,92 @@ public class DynamicStatementBuilderTest {
         when(mockTuple.getFields()).thenReturn(new Fields("weather_station_id", "event_time", "temperature"));
     }
 
+    public static final String QUERY_STRING = "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?,?,?);";
+
     @Test
     public void shouldBuildMultipleStaticInsertStatementGivenKeyspaceAndAllMapper() {
-        executeStatementAndAssert(
-                async(
-                        insertInto("weather", "temperature").values(all()),
-                        insertInto("weather", "temperature").values(all())
-                ),
-                "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
-                "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
+        CQLStatementTupleMapper mapper = async(
+                simpleQuery(QUERY_STRING).with(all()),
+                simpleQuery(QUERY_STRING).with(all())
         );
+        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+        Assert.assertEquals(2, stmts.size());
+        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
+        makeAssertSimpleStatement(QUERY_STRING, stmts.get(1));
     }
 
     @Test
     public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
-        executeStatementAndAssert(insertInto("weather", "temperature").values(all()).build(),
-                "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
+        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+        Assert.assertEquals(1, stmts.size());
+        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
+    }
+
+    @Test
+    public void shouldBuildStaticInsertStatementUsingBuilderGivenKeyspaceAndAllMapper() {
+        Insert insert = QueryBuilder.insertInto("weather", "temperature")
+                .value("weather_station_id", "?")
+                .value("event_time", "?")
+                .value("temperature", "?");
+        SimpleCQLStatementMapper mapper = simpleQuery(insert).with(all()).build();
+        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+        Assert.assertEquals(1, stmts.size());
+        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
     }
 
     @Test
     public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
-        executeStatementAndAssert(insertInto("temperature").values(all()).build(),
-                "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
+        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+        Assert.assertEquals(1, stmts.size());
+        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
     }
 
     @Test
     public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        executeStatementAndAssert(insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature"))).build(),
-                "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")).build();
+        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+        Assert.assertEquals(1, stmts.size());
+        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
     }
 
     @Test
     public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        executeBatchStatementAndAssert(loggedBatch(
-                insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
-        ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+        BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+        Assert.assertEquals(1, stmts.size());
+        makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
     }
 
     @Test
     public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        executeBatchStatementAndAssert(unLoggedBatch(
-                insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
-        ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
-    }
-
-    private void executeBatchStatementAndAssert(CQLStatementTupleMapper mapper, String... results) {
-        List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-
-        BatchStatement statement = (BatchStatement) map.get(0);
-        Collection<Statement> statements = statement.getStatements();
-        Assert.assertEquals(results.length, statements.size());
-
-        for (Statement s : statements)
-            Assert.assertTrue(Arrays.asList(results).contains(s.toString()));
-    }
-
-
-    private void executeStatementAndAssert(CQLStatementTupleMapper mapper, String... expected) {
-        List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-
-        List<String> listExpected = Arrays.asList(expected);
-        for (int i = 0; i < map.size(); i++) {
-            Assert.assertEquals(listExpected.get(i), map.get(i).toString());
-        }
+        BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+        Assert.assertEquals(1, stmts.size());
+        makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
     }
 
     @Test
     public void shouldBuildStaticBoundStatement() {
-        CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weather_station_id, event_time, temperature) VALUES(?, ?, ?)")
-                .bind(with(field("weather_station_id"), field("event_time").now(), field("temperature")));
+        CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field("weather_station_id"), field("event_time").now(), field("temperature")));
         List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
         Statement statement = map.get(0);
         Assert.assertNotNull(statement);
     }
 
+    public void makeAssertSimpleStatement(String expected, Statement actual) {
+        Assert.assertTrue(actual instanceof SimpleStatement);
+        Assert.assertEquals(expected, actual.toString());
+        Assert.assertEquals(3, ((SimpleStatement)actual).getValues(ProtocolVersion.V3).length);
+    }
+
+    public void makeAssertBatchStatement(String expected, Statement actual) {
+        Assert.assertTrue(actual instanceof BatchStatement);
+        Collection<Statement> statements = ((BatchStatement) actual).getStatements();
+        for(Statement s : statements) {
+            Assert.assertEquals(expected, s.toString());
+            Assert.assertEquals(3, ((SimpleStatement)s).getValues(ProtocolVersion.V3).length);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 80eb95a..9a61309 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -21,14 +21,13 @@ package org.apache.storm.cassandra.bolt;
 import backtype.storm.topology.TopologyBuilder;
 import com.datastax.driver.core.ResultSet;
 import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
 
 
 /**
@@ -45,8 +44,9 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
         executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
     }
 
-    private SimpleCQLStatementTupleMapper getInsertInto() {
-        return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
+    private SimpleCQLStatementMapper getInsertInto() {
+        return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
+                .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
     }
 
     protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index 2aef53c..e70b6b0 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -21,14 +21,13 @@ package org.apache.storm.cassandra.bolt;
 import backtype.storm.topology.TopologyBuilder;
 import com.datastax.driver.core.ResultSet;
 import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
 
 /**
  *
@@ -44,8 +43,9 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
         executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
     }
 
-    private SimpleCQLStatementTupleMapper getInsertInto() {
-        return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
+    private SimpleCQLStatementMapper getInsertInto() {
+        return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
+                .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
     }
 
     protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {


[6/6] storm git commit: Added STORM-1348 to Changelog

Posted by bo...@apache.org.
Added STORM-1348 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/989cae6e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/989cae6e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/989cae6e

Branch: refs/heads/master
Commit: 989cae6eccbedef127bf5db56961c040f3fee14d
Parents: fa1ec6f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 11 14:20:22 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 11 14:20:22 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/989cae6e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e544a95..4652993 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1348: refactor API to remove Insert/Update builder in Cassandra connector
  * STORM-1206: Reduce logviewer memory usage through directory stream
  * STORM-1219: Fix HDFS and Hive bolt flush/acking
  * STORM-1150: Fix the authorization of Logviewer in method authorized-log-user?


[4/6] storm git commit: STORM-1348 - Fix tests added with STORM-1211

Posted by bo...@apache.org.
STORM-1348 - Fix tests added with STORM-1211


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0702f54d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0702f54d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0702f54d

Branch: refs/heads/master
Commit: 0702f54d72e1d18411fc24c9019e3e88b7d35e81
Parents: 054a7f3
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Mon Dec 14 11:43:19 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:22 2015 +0100

----------------------------------------------------------------------
 .../apache/storm/cassandra/trident/TridentTopologyTest.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0702f54d/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
index c775919..bdacfe4 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
@@ -45,7 +45,6 @@ import java.util.Random;
 
 import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
 import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
 
 /**
  *
@@ -109,7 +108,8 @@ public class TridentTopologyTest extends BaseTopologyTest {
         CassandraState.Options options = new CassandraState.Options(new CassandraContext());
         CQLStatementTupleMapper insertTemperatureValues = boundQuery(
                 "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
-                .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+                .bind(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))
+                .build();
         options.withCQLStatementTupleMapper(insertTemperatureValues);
         return new CassandraStateFactory(options);
     }
@@ -117,7 +117,7 @@ public class TridentTopologyTest extends BaseTopologyTest {
     public CassandraStateFactory getSelectWeatherStationStateFactory() {
         CassandraState.Options options = new CassandraState.Options(new CassandraContext());
         CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
-                .bind(with(field("weather_station_id").as("id")));
+                .bind(field("weather_station_id").as("id")).build();
         options.withCQLStatementTupleMapper(insertTemperatureValues);
         options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
         return new CassandraStateFactory(options);


[3/6] storm git commit: STORM-1348 - clean imports

Posted by bo...@apache.org.
STORM-1348 - clean imports


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/054a7f38
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/054a7f38
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/054a7f38

Branch: refs/heads/master
Commit: 054a7f38685d88f491666ae658591ab377179d6e
Parents: c2997cf
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Tue Dec 8 17:44:33 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:21 2015 +0100

----------------------------------------------------------------------
 .../cassandra/DynamicStatementBuilder.java      |  9 +++++--
 .../storm/cassandra/Murmur3StreamGrouping.java  |  2 --
 .../query/CQLStatementTupleMapper.java          |  6 -----
 .../builder/BoundCQLStatementMapperBuilder.java |  7 ++++--
 .../SimpleCQLStatementMapperBuilder.java        |  3 ++-
 .../query/impl/PreparedStatementBinder.java     | 11 +++++++--
 .../cassandra/query/selector/FieldSelector.java |  2 --
 .../cassandra/DynamicStatementBuilderTest.java  | 26 +++++++++++---------
 .../bolt/BatchCassandraWriterBoltTest.java      |  4 +--
 .../cassandra/bolt/CassandraWriterBoltTest.java |  4 +--
 10 files changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
index ce53ea3..4a98632 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -20,14 +20,19 @@ package org.apache.storm.cassandra;
 
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.querybuilder.BuiltStatement;
-import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.ContextQuery;
+import org.apache.storm.cassandra.query.CqlMapper;
 import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
 import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder;
 import org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder;
 import org.apache.storm.cassandra.query.selector.FieldSelector;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 public class DynamicStatementBuilder implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
index 966aacd..992bfd0 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -26,8 +26,6 @@ import backtype.storm.tuple.Fields;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
index b100d9a..fc960dd 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
@@ -21,20 +21,14 @@ package org.apache.storm.cassandra.query;
 import backtype.storm.tuple.ITuple;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.google.common.base.Optional;
 
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
-
 /**
  * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
- *
  */
 public interface CQLStatementTupleMapper extends Serializable {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
index 52502b0..a144830 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
@@ -18,14 +18,17 @@
  */
 package org.apache.storm.cassandra.query.builder;
 
-import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.ContextQuery;
+import org.apache.storm.cassandra.query.CqlMapper;
 import org.apache.storm.cassandra.query.impl.BoundCQLStatementTupleMapper;
 import org.apache.storm.cassandra.query.impl.PreparedStatementBinder;
 import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
 import org.apache.storm.cassandra.query.selector.FieldSelector;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.storm.cassandra.query.ContextQuery.*;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
index 401f1bf..29ad9c1 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
@@ -19,7 +19,8 @@
 package org.apache.storm.cassandra.query.builder;
 
 import com.datastax.driver.core.querybuilder.BuiltStatement;
-import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.CqlMapper;
 import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
 import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
 import org.apache.storm.cassandra.query.selector.FieldSelector;

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
index 7a6c78d..4606b05 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
@@ -18,7 +18,10 @@
  */
 package org.apache.storm.cassandra.query.impl;
 
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.TupleValue;
+import com.datastax.driver.core.UDTValue;
 import org.apache.storm.cassandra.query.Column;
 
 import java.io.Serializable;
@@ -26,7 +29,11 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
index 8ce651e..bba7fb5 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
@@ -23,8 +23,6 @@ import com.datastax.driver.core.utils.UUIDs;
 import org.apache.storm.cassandra.query.Column;
 
 import java.io.Serializable;
-import java.util.Map;
-
 
 public class FieldSelector implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
index 2593327..800cad7 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -57,15 +57,19 @@ public class DynamicStatementBuilderTest {
 
     private static final Tuple mockTuple;
 
+    public static final String COL_WEATHER_STATION_ID = "weather_station_id";
+    public static final String COL_EVENT_TIME  = "event_time";
+    public static final String COL_TEMPERATURE = "temperature";
+
     @Rule
     public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql", "weather"));
 
     static {
         mockTuple = Mockito.mock(Tuple.class);
-        when(mockTuple.getValueByField("weather_station_id")).thenReturn("1");
-        when(mockTuple.getValueByField("event_time")).thenReturn(NOW);
-        when(mockTuple.getValueByField("temperature")).thenReturn("0°C");
-        when(mockTuple.getFields()).thenReturn(new Fields("weather_station_id", "event_time", "temperature"));
+        when(mockTuple.getValueByField(COL_WEATHER_STATION_ID)).thenReturn("1");
+        when(mockTuple.getValueByField(COL_EVENT_TIME)).thenReturn(NOW);
+        when(mockTuple.getValueByField(COL_TEMPERATURE)).thenReturn("0°C");
+        when(mockTuple.getFields()).thenReturn(new Fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE));
     }
 
     public static final String QUERY_STRING = "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?,?,?);";
@@ -93,9 +97,9 @@ public class DynamicStatementBuilderTest {
     @Test
     public void shouldBuildStaticInsertStatementUsingBuilderGivenKeyspaceAndAllMapper() {
         Insert insert = QueryBuilder.insertInto("weather", "temperature")
-                .value("weather_station_id", "?")
-                .value("event_time", "?")
-                .value("temperature", "?");
+                .value(COL_WEATHER_STATION_ID, "?")
+                .value(COL_EVENT_TIME, "?")
+                .value(COL_TEMPERATURE, "?");
         SimpleCQLStatementMapper mapper = simpleQuery(insert).with(all()).build();
         List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
         Assert.assertEquals(1, stmts.size());
@@ -112,7 +116,7 @@ public class DynamicStatementBuilderTest {
 
     @Test
     public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")).build();
+        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)).build();
         List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
         Assert.assertEquals(1, stmts.size());
         makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
@@ -120,7 +124,7 @@ public class DynamicStatementBuilderTest {
 
     @Test
     public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+        BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
         List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
         Assert.assertEquals(1, stmts.size());
         makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
@@ -128,7 +132,7 @@ public class DynamicStatementBuilderTest {
 
     @Test
     public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+        BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
         List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
         Assert.assertEquals(1, stmts.size());
         makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
@@ -136,7 +140,7 @@ public class DynamicStatementBuilderTest {
 
     @Test
     public void shouldBuildStaticBoundStatement() {
-        CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field("weather_station_id"), field("event_time").now(), field("temperature")));
+        CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field(COL_WEATHER_STATION_ID), field(COL_EVENT_TIME).now(), field(COL_TEMPERATURE)));
         List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
         Statement statement = map.get(0);
         Assert.assertNotNull(statement);

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 9a61309..c5116e2 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -45,8 +45,8 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
     }
 
     private SimpleCQLStatementMapper getInsertInto() {
-        return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
-                .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
+        return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
+                .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
     }
 
     protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {

http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index e70b6b0..3d1b623 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -44,8 +44,8 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
     }
 
     private SimpleCQLStatementMapper getInsertInto() {
-        return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
-                .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
+        return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
+                .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
     }
 
     protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {


[5/6] storm git commit: Merge branch 'master' of https://github.com/fhussonnois/storm into STORM-1348

Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/fhussonnois/storm into STORM-1348

STORM-1348: refactor API to remove Insert/Update builder in Cassandra connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa1ec6f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa1ec6f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa1ec6f7

Branch: refs/heads/master
Commit: fa1ec6f7e2efb1b3dc85b67fd7341d5583f2ad1d
Parents: df32755 0702f54
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 11 14:19:45 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 11 14:19:45 2016 -0600

----------------------------------------------------------------------
 external/storm-cassandra/README.md              |  70 ++++++---
 .../cassandra/DynamicStatementBuilder.java      |  99 ++++--------
 .../storm/cassandra/Murmur3StreamGrouping.java  |   2 -
 .../query/BaseCQLStatementTupleMapper.java      |  51 +++++++
 .../query/BatchStatementTupleMapper.java        |  57 -------
 .../cassandra/query/CQLClauseTupleMapper.java   |  36 -----
 .../cassandra/query/CQLStatementBuilder.java    |   4 +-
 .../query/CQLStatementTupleMapper.java          |  25 ---
 .../cassandra/query/CQLTableTupleMapper.java    |  39 -----
 .../cassandra/query/CQLValuesTupleMapper.java   |  74 ---------
 .../apache/storm/cassandra/query/Column.java    |  74 +++++++++
 .../apache/storm/cassandra/query/CqlMapper.java |  86 +++++++++++
 .../query/SimpleCQLStatementTupleMapper.java    |  51 -------
 .../builder/BoundCQLStatementMapperBuilder.java | 106 +++++++++++++
 .../SimpleCQLStatementMapperBuilder.java        |  92 +++++++++++
 .../impl/BatchCQLStatementTupleMapper.java      |  58 +++++++
 .../impl/BoundCQLStatementTupleMapper.java      | 106 +++++++++++++
 .../query/impl/BoundStatementMapperBuilder.java | 109 -------------
 .../query/impl/InsertStatementBuilder.java      | 153 -------------------
 .../query/impl/PreparedStatementBinder.java     | 143 +++++++++++++++++
 .../query/impl/RoutingKeyGenerator.java         |  52 +++++++
 .../query/impl/SimpleCQLStatementMapper.java    |  88 +++++++++++
 .../query/impl/UpdateStatementBuilder.java      | 118 --------------
 .../cassandra/query/selector/FieldSelector.java |   8 +-
 .../cassandra/DynamicStatementBuilderTest.java  | 120 +++++++++------
 .../bolt/BatchCassandraWriterBoltTest.java      |  10 +-
 .../cassandra/bolt/CassandraWriterBoltTest.java |  10 +-
 .../cassandra/trident/TridentTopologyTest.java  |   6 +-
 28 files changed, 1020 insertions(+), 827 deletions(-)
----------------------------------------------------------------------



[2/6] storm git commit: STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector

Posted by bo...@apache.org.
STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2997cf9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2997cf9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2997cf9

Branch: refs/heads/master
Commit: c2997cf9f116297812505451269568484196b4c0
Parents: 6d7b968
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Sun Dec 6 22:23:29 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:19 2015 +0100

----------------------------------------------------------------------
 external/storm-cassandra/README.md              |  70 ++++++---
 .../cassandra/DynamicStatementBuilder.java      |  90 +++--------
 .../query/BaseCQLStatementTupleMapper.java      |  51 +++++++
 .../query/BatchStatementTupleMapper.java        |  57 -------
 .../cassandra/query/CQLClauseTupleMapper.java   |  36 -----
 .../cassandra/query/CQLStatementBuilder.java    |   4 +-
 .../query/CQLStatementTupleMapper.java          |  19 ---
 .../cassandra/query/CQLTableTupleMapper.java    |  39 -----
 .../cassandra/query/CQLValuesTupleMapper.java   |  74 ---------
 .../apache/storm/cassandra/query/Column.java    |  74 +++++++++
 .../apache/storm/cassandra/query/CqlMapper.java |  86 +++++++++++
 .../query/SimpleCQLStatementTupleMapper.java    |  51 -------
 .../builder/BoundCQLStatementMapperBuilder.java | 103 +++++++++++++
 .../SimpleCQLStatementMapperBuilder.java        |  91 +++++++++++
 .../impl/BatchCQLStatementTupleMapper.java      |  58 +++++++
 .../impl/BoundCQLStatementTupleMapper.java      | 106 +++++++++++++
 .../query/impl/BoundStatementMapperBuilder.java | 109 -------------
 .../query/impl/InsertStatementBuilder.java      | 153 -------------------
 .../query/impl/PreparedStatementBinder.java     | 136 +++++++++++++++++
 .../query/impl/RoutingKeyGenerator.java         |  52 +++++++
 .../query/impl/SimpleCQLStatementMapper.java    |  88 +++++++++++
 .../query/impl/UpdateStatementBuilder.java      | 118 --------------
 .../cassandra/query/selector/FieldSelector.java |   6 +-
 .../cassandra/DynamicStatementBuilderTest.java  | 108 +++++++------
 .../bolt/BatchCassandraWriterBoltTest.java      |  10 +-
 .../cassandra/bolt/CassandraWriterBoltTest.java |  10 +-
 26 files changed, 991 insertions(+), 808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 943fe5e..1edf708 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -39,17 +39,24 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 ```java
 
     new CassandraWriterBolt(
-        insertInto("album")
-            .values(
-                with(fields("title", "year", "performer", "genre", "tracks")
-                ).build());
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with(
+                    fields("title", "year", "performer", "genre", "tracks")
+                 )
+            )
+    );
 ```
+
 ##### Insert query including all tuple fields.
 ```java
 
     new CassandraWriterBolt(
-        insertInto("album")
-            .values(all()).build());
+        async(
+            simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with( all() )
+            )
+    );
 ```
 
 ##### Insert multiple queries from one input tuple.
@@ -57,32 +64,49 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 
     new CassandraWriterBolt(
         async(
-        insertInto("titles_per_album").values(all()),
-        insertInto("titles_per_performer").values(all())
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
         )
     );
 ```
 
-##### Insert query including some fields with aliases
+##### Insert query using QueryBuilder
 ```java
 
     new CassandraWriterBolt(
-        insertInto("album")
-            .values(
-                with(field("ti"),as("title",
-                     field("ye").as("year")),
-                     field("pe").as("performer")),
-                     field("ge").as("genre")),
-                     field("tr").as("tracks")),
-                     ).build());
+        async(
+            simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .with(all()))
+            )
+    )
 ```
 
 ##### Insert query with static bound query
 ```java
 
     new CassandraWriterBolt(
-         boundQuery("INSERT INTO album (\"title\", \"year\", \"performer\", \"genre\", \"tracks\") VALUES(?, ?, ?, ?, ?);")
-            .bind(all());
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+                .bind(all());
+         )
+    );
+```
+
+##### Insert query with static bound query using named setters and aliases
+```java
+
+    new CassandraWriterBolt(
+         async(
+            boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
+                .bind(
+                    field("ti"),as("title"),
+                    field("ye").as("year")),
+                    field("pe").as("performer")),
+                    field("ge").as("genre")),
+                    field("tr").as("tracks"))
+                ).byNamedSetters()
+         )
+    );
 ```
 
 ##### Insert query with bound statement load from storm configuration
@@ -106,14 +130,14 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 
     // Logged
     new CassandraWriterBolt(loggedBatch(
-        insertInto("title_per_album").values(all())
-        insertInto("title_per_performer").values(all())
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
         )
     );
 // UnLogged
     new CassandraWriterBolt(unLoggedBatch(
-        insertInto("title_per_album").values(all())
-        insertInto("title_per_performer").values(all())
+            simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+            simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
         )
     );
 ```

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
index deea8da..ce53ea3 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -19,10 +19,11 @@
 package org.apache.storm.cassandra;
 
 import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
 import org.apache.storm.cassandra.query.*;
-import org.apache.storm.cassandra.query.impl.BoundStatementMapperBuilder;
-import org.apache.storm.cassandra.query.impl.InsertStatementBuilder;
-import org.apache.storm.cassandra.query.impl.UpdateStatementBuilder;
+import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder;
+import org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder;
 import org.apache.storm.cassandra.query.selector.FieldSelector;
 
 import java.io.Serializable;
@@ -33,73 +34,32 @@ public class DynamicStatementBuilder implements Serializable {
     private DynamicStatementBuilder() {
     }
 
-    /**
-     * Builds a new insert statement for the specified table.
-     *
-     * @param table the table's name.
-     * @return a new {@link InsertStatementBuilder} instance.
-     */
-    public static final InsertStatementBuilder insertInto(String table) {
-        return new InsertStatementBuilder(table);
-    }
-    /**
-     * Builds a new insert statement based on the specified CQL mapper.
-     *
-     * @param mapper the CQL mapper.
-     * @return a new {@link InsertStatementBuilder} instance.
-     */
-    public static final InsertStatementBuilder insertInto(CQLTableTupleMapper mapper) {
-        return new InsertStatementBuilder(mapper);
-    }
-    /**
-     * Builds a new insert statement for the specified keyspace and table.
-     *
-     * @param ks the keyspace to use.
-     * @param table the table's name.
-     * @return a new {@link InsertStatementBuilder} instance.
-     */
-    public static final InsertStatementBuilder insertInto(String ks, String table) {
-        return new InsertStatementBuilder(table, ks);
+    public static final SimpleCQLStatementMapperBuilder simpleQuery(String queryString) {
+        return new SimpleCQLStatementMapperBuilder(queryString);
     }
 
-    /**
-     * Builds a new update statement for the specified table.
-     *
-     * @param table the table's name.
-     * @return a new {@link UpdateStatementBuilder} instance.
-     */
-    public static final UpdateStatementBuilder update(String table) {
-        return new UpdateStatementBuilder(table);
-    }
-
-    /**
-     * Builds a new update statement for the specified keyspace and table.
-     *
-     * @param table the table's name.
-     * @return a new {@link UpdateStatementBuilder} instance.
-     */
-    public static final UpdateStatementBuilder update(String ks, String table) {
-        return new UpdateStatementBuilder(table, ks);
+    public static final SimpleCQLStatementMapperBuilder simpleQuery(BuiltStatement builtStatement) {
+        return new SimpleCQLStatementMapperBuilder(builtStatement);
     }
 
     /**
      * Builds a new bound statement based on the specified query.
      *
      * @param cql the query.
-     * @return a new {@link BoundStatementMapperBuilder} instance.
+     * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
      */
-    public static final BoundStatementMapperBuilder boundQuery(String cql) {
-        return new BoundStatementMapperBuilder(cql);
+    public static final BoundCQLStatementMapperBuilder boundQuery(String cql) {
+        return new BoundCQLStatementMapperBuilder(cql);
     }
 
     /**
      * Builds a new bound statement identified by the given field.
      *
      * @param field a context used to resolve the cassandra query.
-     * @return a new {@link BoundStatementMapperBuilder} instance.
+     * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
      */
-    public static final BoundStatementMapperBuilder boundQuery(ContextQuery field) {
-        return new BoundStatementMapperBuilder(field);
+    public static final BoundCQLStatementMapperBuilder boundQuery(ContextQuery field) {
+        return new BoundCQLStatementMapperBuilder(field);
     }
 
     /**
@@ -115,27 +75,27 @@ public class DynamicStatementBuilder implements Serializable {
     /**
      * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#LOGGED} batch statement for the specified CQL statement builders.
      */
-    public static final BatchStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
+    public static final BatchCQLStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders);
     }
     /**
      * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders.
      */
-    public static final BatchStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
+    public static final BatchCQLStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders);
     }
     /**
      * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders.
      */
-    public static final BatchStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
+    public static final BatchCQLStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.UNLOGGED, builders);
     }
 
-    private static BatchStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
+    private static BatchCQLStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
         List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length);
         for(CQLStatementBuilder b : Arrays.asList(builders))
             mappers.add(b.build());
-        return new BatchStatementTupleMapper(type, mappers);
+        return new BatchCQLStatementTupleMapper(type, mappers);
     }
 
     /**
@@ -181,19 +141,11 @@ public class DynamicStatementBuilder implements Serializable {
         return fl.toArray(new FieldSelector[size]);
     }
 
-    /**
-     * Includes only the specified tuple fields.
-     *
-     * @param fields a list of field selector.
-     */
-    public static final CQLValuesTupleMapper with(final FieldSelector... fields) {
-        return new CQLValuesTupleMapper.WithFieldTupleMapper(Arrays.asList(fields));
-    }
 
     /**
      * Includes all tuple fields.
      */
-    public static final CQLValuesTupleMapper all() {
-        return new CQLValuesTupleMapper.AllTupleMapper();
+    public static final CqlMapper.DefaultCqlMapper all() {
+        return new CqlMapper.DefaultCqlMapper();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
new file mode 100644
index 0000000..c9ba6fa
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
+ *
+ */
+public abstract class BaseCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        return Arrays.asList(map(tuple));
+    }
+
+    /**
+     * Maps a given tuple to a single CQL statements.
+     *
+     * @param tuple the incoming tuple to map.
+     * @return a list of {@link com.datastax.driver.core.Statement}.
+     */
+    public abstract Statement map(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
deleted file mode 100644
index 32ff1de..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-
-public class BatchStatementTupleMapper implements CQLStatementTupleMapper {
-
-    private final List<CQLStatementTupleMapper> mappers;
-    private final BatchStatement.Type type;
-
-    /**
-     * Creates a new {@link BatchStatementTupleMapper} instance.
-     * @param type
-     * @param mappers
-     */
-    public BatchStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
-        this.mappers = new ArrayList<>(mappers);
-        this.type = type;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Statement> map(Map conf, Session session, ITuple tuple) {
-        final BatchStatement batch = new BatchStatement(this.type);
-        for(CQLStatementTupleMapper m : mappers)
-            batch.addAll(m.map(conf, session, tuple));
-        return Arrays.asList((Statement)batch);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
deleted file mode 100644
index baa4685..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.querybuilder.Clause;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to
- * a list of cassandra {@link com.datastax.driver.core.querybuilder.Clause}s.
- *
- */
-public interface CQLClauseTupleMapper extends Serializable {
-
-    List<Clause> map(ITuple tuple);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
index 6c04d1c..9ddb58a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
@@ -21,11 +21,11 @@ package org.apache.storm.cassandra.query;
 import java.io.Serializable;
 
 
-public interface CQLStatementBuilder extends Serializable {
+public interface CQLStatementBuilder<T extends CQLStatementTupleMapper> extends Serializable {
 
     /**
      * Builds a new {@link CQLStatementTupleMapper} instance.
      * @return a new CQLStatementMapper instance.
      */
-    <T extends CQLStatementTupleMapper> T build();
+    T build();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
index 32c86dd..b100d9a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
@@ -38,10 +38,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
  */
 public interface CQLStatementTupleMapper extends Serializable {
 
-    public static final String FIELD_KEYSPACE =  "keyspace";
-    public static final String FIELD_TABLE    =  "table";
-    public static final String FIELD_VALUES   =  "value";
-
     /**
      * Maps a given tuple to one or multiple CQL statements.
      *
@@ -52,21 +48,6 @@ public interface CQLStatementTupleMapper extends Serializable {
      */
     List<Statement> map(Map conf, Session session, ITuple tuple);
 
-    public static class InsertCQLStatementTupleMapper implements CQLStatementTupleMapper {
-        @Override
-        public List<Statement> map(Map conf, Session session, ITuple tuple) {
-            Optional<String> ks = Optional.fromNullable(tuple.contains(FIELD_KEYSPACE) ? tuple.getStringByField(FIELD_KEYSPACE) : null);
-            String table = tuple.getStringByField(FIELD_TABLE);
-            Map<String, Object> values = (Map<String, Object>) tuple.getValueByField(FIELD_VALUES);
-
-            final Insert stmt = (ks.isPresent()) ? insertInto(ks.get(), table) : insertInto(table);
-            for(Map.Entry<String, Object> v : values.entrySet())
-                stmt.value(v.getKey(), v.getValue());
-
-            return Arrays.asList((Statement)stmt);
-        }
-    }
-
     public static class DynamicCQLStatementTupleMapper implements CQLStatementTupleMapper {
         private List<CQLStatementBuilder> builders;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
deleted file mode 100644
index 574eac9..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-
-import java.io.Serializable;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to a table name.
- *
- */
-public interface CQLTableTupleMapper extends Serializable {
-
-    /**
-     * Returns a table's name from the specified tuple.
-     *
-     * @param tuple the incoming tuple.
-     * @return the table name.
-     */
-    String map(ITuple tuple);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
deleted file mode 100644
index 2cb8e4c..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import org.apache.storm.cassandra.query.selector.FieldSelector;
-
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to Map of values of a CQL statement.
- *
- */
-public interface CQLValuesTupleMapper extends Serializable {
-
-    /**
-     * Map the specified {@code tuple} to values of CQL statement(s).
-     * @param tuple the incoming tuple to map.
-     * @return values of CQL statement(s)
-     */
-    Map<String, Object> map(ITuple tuple);
-
-    /**
-     * Default {@link CQLValuesTupleMapper} implementation to get specifics tuple's fields.
-     */
-    public static class WithFieldTupleMapper implements CQLValuesTupleMapper {
-        private List<FieldSelector> fields;
-
-        public WithFieldTupleMapper(List<FieldSelector> fields) {
-            this.fields = fields;
-        }
-
-        @Override
-        public Map<String, Object> map(ITuple tuple) {
-            Map<String, Object> ret = new LinkedHashMap<>();
-            for(FieldSelector fs : fields)
-                fs.selectAndPut(tuple, ret);
-            return ret;
-        }
-    }
-
-    /**
-     * Default {@link CQLValuesTupleMapper} implementation to get all tuple's fields.
-     */
-    public static class AllTupleMapper implements CQLValuesTupleMapper {
-        @Override
-        public Map<String, Object> map(ITuple tuple) {
-            Map<String, Object> ret = new LinkedHashMap<>();
-            for(String name  : tuple.getFields())
-                ret.put(name, tuple.getValueByField(name));
-            return ret;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
new file mode 100644
index 0000000..06643a5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class Column<T> implements Serializable {
+
+    private final String columnName;
+    private final T val;
+
+    public Column(String columnName, T val) {
+        this.columnName = columnName;
+        this.val = val;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public T getVal() {
+        return val;
+    }
+
+    public boolean isNull() { return val == null;}
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Column column = (Column) o;
+
+        if (columnName != null ? !columnName.equals(column.columnName) : column.columnName != null) return false;
+        if (val != null ? !val.equals(column.val) : column.val != null) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = columnName != null ? columnName.hashCode() : 0;
+        result = 31 * result + (val != null ? val.hashCode() : 0);
+        return result;
+    }
+
+    public static Object[] getVals(List<Column> columns) {
+        List<Object> vals = new ArrayList<>(columns.size());
+        for(Column c : columns)
+            vals.add(c.getVal());
+        return vals.toArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
new file mode 100644
index 0000000..2ab8f92
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Default interface to defines how a storm tuple maps to a list of columns representing a row in a database.
+ */
+public interface CqlMapper extends Serializable {
+
+    /**
+     * Maps the specified input tuple to a list of CQL columns.
+     * @param tuple the input tuple.
+     * @return the list of
+     */
+    List<Column> map(ITuple tuple);
+
+
+    public static final class SelectableCqlMapper implements CqlMapper {
+
+        private final List<FieldSelector> selectors;
+
+        /**
+         * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+         * @param selectors list of selectors used to extract column values from tuple.
+         */
+        public SelectableCqlMapper(List<FieldSelector> selectors) {
+            this.selectors = selectors;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public List<Column> map(ITuple tuple) {
+            List<Column> columns = new ArrayList<>(selectors.size());
+            for(FieldSelector selector : selectors)
+                columns.add(selector.select(tuple));
+            return columns;
+        }
+    }
+
+    /**
+     * Default {@link CqlMapper} to map all tuple values to column.
+     */
+    public static final class DefaultCqlMapper implements CqlMapper {
+
+        /**
+         * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+         */
+        public DefaultCqlMapper() {}
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public List<Column> map(ITuple tuple) {
+            List<Column> columns = new ArrayList<>(tuple.size());
+            for(String name  : tuple.getFields())
+                columns.add(new Column(name, tuple.getValueByField(name)));
+            return columns;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
deleted file mode 100644
index 683824e..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
- *
- */
-public abstract class SimpleCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public List<Statement> map(Map conf, Session session, ITuple tuple) {
-        return Arrays.asList(map(tuple));
-    }
-
-    /**
-     * Maps a given tuple to a single CQL statements.
-     *
-     * @param tuple the incoming tuple to map.
-     * @return a list of {@link com.datastax.driver.core.Statement}.
-     */
-    public abstract Statement map(ITuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..52502b0
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.BoundCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.PreparedStatementBinder;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.*;
+
+import static org.apache.storm.cassandra.query.ContextQuery.*;
+
+public class BoundCQLStatementMapperBuilder implements CQLStatementBuilder<BoundCQLStatementTupleMapper>,  Serializable {
+
+    private final ContextQuery contextQuery;
+
+    private CqlMapper mapper;
+
+    private List<String> routingKeys;
+
+    private PreparedStatementBinder binder;
+
+    /**
+     * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+     * @param cql
+     */
+    public BoundCQLStatementMapperBuilder(String cql) {
+        this.contextQuery = new StaticContextQuery(cql);
+    }
+
+    /**
+     * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+     * @param contextQuery
+     */
+    public BoundCQLStatementMapperBuilder(ContextQuery contextQuery) {
+        this.contextQuery = contextQuery;
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param fields a list of field selector.
+     */
+    public final BoundCQLStatementMapperBuilder bind(final FieldSelector... fields) {
+        this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+        return this;
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param mapper a column mapper.
+     */
+    public final BoundCQLStatementMapperBuilder bind(CqlMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public final BoundCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+        this.routingKeys = Arrays.asList(fields);
+        return this;
+    }
+
+    public final BoundCQLStatementMapperBuilder byNamedSetters() {
+        this.binder = new PreparedStatementBinder.CQL3NamedSettersBinder();
+        return this;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public BoundCQLStatementTupleMapper build() {
+        return new BoundCQLStatementTupleMapper(contextQuery, mapper, getRoutingKeyGenerator(), getStatementBinder());
+    }
+
+    private RoutingKeyGenerator getRoutingKeyGenerator() {
+        return (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null;
+    }
+
+    private PreparedStatementBinder getStatementBinder() {
+        return (binder != null) ? binder : new PreparedStatementBinder.DefaultBinder();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..401f1bf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Default class to build {@link org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper} instance.
+ */
+public class SimpleCQLStatementMapperBuilder implements CQLStatementBuilder<SimpleCQLStatementMapper>, Serializable {
+
+    private final String queryString;
+
+    private CqlMapper mapper;
+
+    private List<String> routingKeys;
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+     * @param queryString a valid CQL query string.
+     */
+    public SimpleCQLStatementMapperBuilder(String queryString) {
+        this.queryString = queryString;
+    }
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+     * @param builtStatement a query built statements
+     */
+    public SimpleCQLStatementMapperBuilder(BuiltStatement builtStatement) {
+        this.queryString = builtStatement.getQueryString();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SimpleCQLStatementMapper build() {
+        return new SimpleCQLStatementMapper(queryString, mapper, (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null );
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param fields a list of field selector.
+     */
+    public final SimpleCQLStatementMapperBuilder with(final FieldSelector... fields) {
+        this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+        return this;
+    }
+
+    public final SimpleCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+        this.routingKeys = Arrays.asList(fields);
+        return this;
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param mapper a column mapper.
+     */
+    public final SimpleCQLStatementMapperBuilder with(CqlMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
new file mode 100644
index 0000000..fe948f5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+public class BatchCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+    private final List<CQLStatementTupleMapper> mappers;
+    private final BatchStatement.Type type;
+
+    /**
+     * Creates a new {@link BatchCQLStatementTupleMapper} instance.
+     * @param type
+     * @param mappers
+     */
+    public BatchCQLStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
+        this.mappers = new ArrayList<>(mappers);
+        this.type = type;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        final BatchStatement batch = new BatchStatement(this.type);
+        for(CQLStatementTupleMapper m : mappers)
+            batch.addAll(m.map(conf, session, tuple));
+        return Arrays.asList((Statement)batch);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
new file mode 100644
index 0000000..8cce418
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+import org.apache.storm.cassandra.query.ContextQuery;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+    private final ContextQuery contextQuery;
+
+    private final CqlMapper mapper;
+
+    private Map<String, PreparedStatement> cache = new HashMap<>();
+
+    private final RoutingKeyGenerator rkGenerator;
+
+    private final PreparedStatementBinder binder;
+
+    /**
+     * Creates a new {@link BoundCQLStatementTupleMapper} instance.
+     *
+     * @param contextQuery
+     * @param mapper
+     * @param mapper
+     * @param rkGenerator
+     * @param binder
+     */
+    public BoundCQLStatementTupleMapper(ContextQuery contextQuery, CqlMapper mapper, RoutingKeyGenerator rkGenerator, PreparedStatementBinder binder) {
+        Preconditions.checkNotNull(contextQuery, "ContextQuery must not be null");
+        Preconditions.checkNotNull(mapper, "Mapper should not be null");
+        this.contextQuery = contextQuery;
+        this.mapper = mapper;
+        this.rkGenerator = rkGenerator;
+        this.binder = binder;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Statement> map(Map config, Session session, ITuple tuple) {
+        final List<Column> columns = mapper.map(tuple);
+
+        final String query = contextQuery.resolves(config, tuple);
+
+        PreparedStatement statement = getPreparedStatement(session, query);
+        if(hasRoutingKeys()) {
+            List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+            if( keys.size() == 1)
+                statement.setRoutingKey(keys.get(0));
+            else
+                statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+        }
+
+        return Arrays.asList((Statement) this.binder.apply(statement, columns));
+    }
+
+    private boolean hasRoutingKeys() {
+        return rkGenerator != null;
+    }
+
+    /**
+     * Get or prepare a statement using the specified session and the query.
+     * *
+     * @param session The cassandra session.
+     * @param query The CQL query to prepare.
+     */
+    private PreparedStatement getPreparedStatement(Session session, String query) {
+        PreparedStatement statement = cache.get(query);
+        if( statement == null) {
+            statement = session.prepare(query);
+            cache.put(query, statement);
+        }
+        return statement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
deleted file mode 100644
index b7ded14..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.ContextQuery;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery;
-
-
-public class BoundStatementMapperBuilder implements Serializable {
-    private final ContextQuery contextQuery;
-
-    /**
-     * Creates a new {@link BoundStatementMapperBuilder} instance.
-     * @param cql
-     */
-    public BoundStatementMapperBuilder(String cql) {
-        this.contextQuery = new StaticContextQuery(cql);
-    }
-
-    /**
-     * Creates a new {@link BoundStatementMapperBuilder} instance.
-     * @param contextQuery
-     */
-    public BoundStatementMapperBuilder(ContextQuery contextQuery) {
-        this.contextQuery = contextQuery;
-    }
-
-    public CQLStatementTupleMapper bind(final CQLValuesTupleMapper mapper) {
-        return new CQLBoundStatementTupleMapper(contextQuery, mapper);
-    }
-
-    public static class CQLBoundStatementTupleMapper implements CQLStatementTupleMapper {
-
-        private final ContextQuery contextQuery;
-
-        private final CQLValuesTupleMapper mapper;
-
-        // should be a lru-map or weakhashmap else this may lead to memory leaks.
-        private Map<String, PreparedStatement> cache = new HashMap<>();
-
-        /**
-         * Creates a new {@link CQLBoundStatementTupleMapper} instance.
-         *
-         * @param contextQuery
-         * @param mapper
-         */
-        CQLBoundStatementTupleMapper(ContextQuery contextQuery, CQLValuesTupleMapper mapper) {
-            this.contextQuery = contextQuery;
-            this.mapper = mapper;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public List<Statement> map(Map config, Session session, ITuple tuple) {
-            final Map<String, Object> values = mapper.map(tuple);
-            final String query = contextQuery.resolves(config, tuple);
-            Object[] objects = values.values().toArray(new Object[values.size()]);
-            PreparedStatement statement = getPreparedStatement(session, query);
-            // todo bind objects in the same sequence as the statement expects.
-            return Arrays.asList((Statement) statement.bind(objects));
-        }
-
-        /**
-         * Get or prepare a statement using the specified session and the query.
-         * *
-         * @param session The cassandra session.
-         * @param query The CQL query to prepare.
-         */
-        private PreparedStatement getPreparedStatement(Session session, String query) {
-            PreparedStatement statement = cache.get(query);
-            if (statement == null) {
-                statement = session.prepare(query);
-                cache.put(query, statement);
-            }
-            return statement;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
deleted file mode 100644
index 8eddb04..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.base.Preconditions;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLTableTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class InsertStatementBuilder<T extends CQLValuesTupleMapper> implements CQLStatementBuilder {
-    /**
-     * Optional name of the table into insert.
-     */
-    protected final String table;
-    /**
-     * Optional keyspace name to insert.
-     */
-    protected final String keyspace;
-    /**
-     * Optional mapper to retrieve a table name from tuple.
-     */
-    protected final CQLTableTupleMapper mapper2TableName;
-
-    protected T valuesMapper;
-
-    protected boolean ifNotExist;
-
-    protected Integer ttlInSeconds;
-
-    protected ConsistencyLevel level;
-
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param table    name of the table into insert.
-     */
-    public InsertStatementBuilder(String table) {
-        this(null, table, null);
-    }
-
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param table    name of the table into insert.
-     * @param keyspace the keyspace's name to which the table belongs
-     */
-    public InsertStatementBuilder(String table, String keyspace) {
-        this(keyspace, table, null);
-    }
-
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param mapper2TableName the mapper that specify the table in which insert
-     * @param keyspace         the name of the keyspace to which the table belongs
-     */
-    public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName, String keyspace) {
-        this(keyspace, null, mapper2TableName);
-    }
-    /**
-     * Creates a new {@link InsertStatementBuilder} instance.
-     *
-     * @param mapper2TableName the mapper that specify the table in which insert
-     */
-    public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName) {
-        this(null, null, mapper2TableName);
-    }
-
-    private InsertStatementBuilder(String keyspace, String table, CQLTableTupleMapper mapper2TableName) {
-        this.keyspace = keyspace;
-        this.table = table;
-        this.mapper2TableName = mapper2TableName;
-    }
-
-    /**
-     * Adds "IF NOT EXISTS" clause to the statement.
-     * @see com.datastax.driver.core.querybuilder.Insert#ifNotExists()
-     */
-    public InsertStatementBuilder ifNotExists() {
-        this.ifNotExist = true;
-        return this;
-    }
-
-    public InsertStatementBuilder usingTTL(Long time, TimeUnit unit) {
-        this.ttlInSeconds = (int) unit.toSeconds(time);
-        return this;
-    }
-
-    /**
-     * Sets the consistency level used for this statement.
-     *
-     * @param level a ConsistencyLevel.
-     */
-    public InsertStatementBuilder withConsistencyLevel(ConsistencyLevel level) {
-        this.level = level;
-        return this;
-    }
-
-    /**
-     * Maps tuple to values.
-     *
-     * @see com.datastax.driver.core.querybuilder.Insert#value(String, Object)
-     */
-    public InsertStatementBuilder values(final T valuesMapper) {
-        this.valuesMapper = valuesMapper;
-        return this;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public SimpleCQLStatementTupleMapper build() {
-        Preconditions.checkState(null != table, "Table name must not be null");
-        return new SimpleCQLStatementTupleMapper() {
-            @Override
-            public Statement map(ITuple tuple) {
-                Insert stmt = QueryBuilder.insertInto(keyspace, table);
-                for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
-                    stmt.value(entry.getKey(), entry.getValue());
-                if (ttlInSeconds != null) stmt.using(QueryBuilder.ttl(ttlInSeconds));
-                if (ifNotExist) stmt.ifNotExists();
-                if (level != null) stmt.setConsistencyLevel(level);
-                return stmt;
-            }
-        };
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
new file mode 100644
index 0000000..7a6c78d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import com.datastax.driver.core.*;
+import org.apache.storm.cassandra.query.Column;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ *
+ */
+public interface PreparedStatementBinder extends Serializable {
+
+    public BoundStatement apply(PreparedStatement statement, List<Column> columns);
+
+    public static final class DefaultBinder implements PreparedStatementBinder {
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+            Object[] values = Column.getVals(columns);
+            return statement.bind(values);
+        }
+    }
+
+    public static final class CQL3NamedSettersBinder implements PreparedStatementBinder {
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+            Object[] values = Column.getVals(columns);
+
+            BoundStatement boundStatement = statement.bind();
+            for(Column col : columns) {
+                // For native protocol V3 or below, all variables must be bound.
+                // With native protocol V4 or above, variables can be left unset,
+                // in which case they will be ignored server side (no tombstones will be generated).
+                if(col.isNull()) {
+                    boundStatement.setToNull(col.getColumnName());
+                } else {
+                    bind(boundStatement, col.getColumnName(), col.getVal());
+                }
+            }
+            return statement.bind(values);
+        }
+
+        /**
+         * This ugly method comes from {@link com.datastax.driver.core.TypeCodec#getDataTypeFor(Object)}.
+         */
+        static void bind(BoundStatement statement, String name, Object value) {
+            // Starts with ByteBuffer, so that if already serialized value are provided, we don't have the
+            // cost of testing a bunch of other types first
+            if (value instanceof ByteBuffer)
+                statement.setBytes(name, (ByteBuffer)value);
+
+            if (value instanceof Number) {
+                if (value instanceof Integer)
+                    statement.setInt(name, (Integer)value);
+                if (value instanceof Long)
+                    statement.setLong(name, (Long) value);
+                if (value instanceof Float)
+                    statement.setFloat(name, (Float) value);
+                if (value instanceof Double)
+                    statement.setDouble(name, (Double)value);
+                if (value instanceof BigDecimal)
+                    statement.setDecimal(name, (BigDecimal)value);
+                if (value instanceof BigInteger)
+                    statement.setVarint(name, (BigInteger)value);
+                throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+            }
+
+            if (value instanceof String)
+                statement.setString(name, (String)value);
+
+            if (value instanceof Boolean)
+                statement.setBool(name, (Boolean)value);
+
+            if (value instanceof InetAddress)
+                statement.setInet(name, (InetAddress)value);
+
+            if (value instanceof Date)
+                statement.setDate(name, (Date)value);
+
+            if (value instanceof UUID)
+                statement.setUUID(name, (UUID)value);
+
+            if (value instanceof List) {
+                statement.setList(name, (List)value);
+            }
+
+            if (value instanceof Set) {
+                statement.setSet(name, (Set)value);
+            }
+
+            if (value instanceof Map) {
+                statement.setMap(name, (Map) value);
+            }
+
+            if (value instanceof UDTValue) {
+                statement.setUDTValue(name, (UDTValue)value);
+            }
+
+            if (value instanceof TupleValue) {
+                statement.setTupleValue(name, (TupleValue) value);
+            }
+
+            throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
new file mode 100644
index 0000000..3f4f47e
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RoutingKeyGenerator implements Serializable {
+
+    private List<String> routingKeys;
+
+    /**
+     * Creates a new {@link RoutingKeyGenerator} instance.
+     * @param routingKeys
+     */
+    public RoutingKeyGenerator(List<String> routingKeys) {
+        Preconditions.checkNotNull(routingKeys);
+        this.routingKeys = routingKeys;
+    }
+
+    public List<ByteBuffer> getRoutingKeys(ITuple tuple) {
+        List<ByteBuffer> keys = new ArrayList<>(routingKeys.size());
+        for(String s : routingKeys) {
+            Object value = tuple.getValueByField(s);
+            keys.add(DataType.serializeValue(value, ProtocolVersion.NEWEST_SUPPORTED));
+        }
+        return keys;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
new file mode 100644
index 0000000..ab9adbf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class SimpleCQLStatementMapper implements CQLStatementTupleMapper {
+
+    private final String queryString;
+    private final CqlMapper mapper;
+    private final RoutingKeyGenerator rkGenerator;
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapper} instance.
+     * @param queryString the cql query string to execute.
+     * @param mapper the mapper.
+     */
+    public SimpleCQLStatementMapper(String queryString, CqlMapper mapper) {
+        this(queryString, mapper, null);
+    }
+
+    /**
+     * Creates a new {@link SimpleCQLStatementMapper} instance.
+     * @param queryString the cql query string to execute.
+     * @param mapper the mapper.
+     */
+    public SimpleCQLStatementMapper(String queryString, CqlMapper mapper, RoutingKeyGenerator rkGenerator) {
+        Preconditions.checkNotNull(queryString, "Query string must not be null");
+        Preconditions.checkNotNull(mapper, "Mapper should not be null");
+        this.queryString = queryString;
+        this.mapper = mapper;
+        this.rkGenerator = rkGenerator;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public List<Statement> map(Map conf, Session session, ITuple tuple) {
+        List<Column> columns = mapper.map(tuple);
+        SimpleStatement statement = new SimpleStatement(queryString, Column.getVals(columns));
+
+        if(hasRoutingKeys()) {
+            List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+            if( keys.size() == 1)
+                statement.setRoutingKey(keys.get(0));
+            else
+                statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+        }
+
+        return Arrays.asList((Statement) statement);
+    }
+
+    private boolean hasRoutingKeys() {
+        return rkGenerator != null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
deleted file mode 100644
index 70696ab..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Clause;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Update;
-import org.apache.storm.cassandra.query.CQLClauseTupleMapper;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-
-import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
-
-public final class UpdateStatementBuilder implements CQLStatementBuilder {
-
-    /**
-     * The name of table to update. 
-     */
-    private final String table;
-    /**
-     * The keyspace of the table.
-     */
-    private final String keyspace;
-
-    private CQLValuesTupleMapper valuesMapper;
-
-    private CQLClauseTupleMapper clausesMapper;
-
-    private CQLClauseTupleMapper onlyIfClausesMapper;
-    /**
-     * Creates a new {@link UpdateStatementBuilder} instance.
-     * @param table the name of table to update.
-     */
-    public UpdateStatementBuilder(String table) {
-        this(table, null);
-    }
-
-    /**
-     * Creates a new {@link UpdateStatementBuilder} instance.
-     * @param table the name of table to update.
-     * @param keyspace the keyspace of the table.
-     */
-    public UpdateStatementBuilder(String table, String keyspace) {
-        this.table = table;
-        this.keyspace = keyspace;
-    }
-
-    /**
-     * Maps output tuple to values.
-     * @see com.datastax.driver.core.querybuilder.Update#with(com.datastax.driver.core.querybuilder.Assignment)
-     */
-    public UpdateStatementBuilder with(final CQLValuesTupleMapper values) {
-        this.valuesMapper = values;
-        return this;
-    }
-
-    /**
-     * Maps output tuple to some Where clauses.
-     * @see com.datastax.driver.core.querybuilder.Update#where(com.datastax.driver.core.querybuilder.Clause)
-     */
-    public UpdateStatementBuilder where(final CQLClauseTupleMapper clauses) {
-        this.clausesMapper = clauses;
-        return this;
-    }
-
-    /**
-     * Maps output tuple to some If clauses.
-     * @see com.datastax.driver.core.querybuilder.Update#onlyIf(com.datastax.driver.core.querybuilder.Clause)
-     */
-    public UpdateStatementBuilder onlyIf(final CQLClauseTupleMapper clauses) {
-        this.onlyIfClausesMapper = clauses;
-        return this;
-    }
-
-    @Override
-    public SimpleCQLStatementTupleMapper build() {
-        return new SimpleCQLStatementTupleMapper() {
-            @Override
-            public Statement map(ITuple tuple) {
-                Update stmt = QueryBuilder.update(keyspace, table);
-                for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
-                    stmt.with(set(entry.getKey(), entry.getValue()));
-                for(Clause clause : clausesMapper.map(tuple))
-                    stmt.where(clause);
-                if( hasIfClauses())
-                    for(Clause clause : onlyIfClausesMapper.map(tuple)) {
-                        stmt.onlyIf(clause);
-                    }
-                return stmt;
-            }
-        };
-    }
-
-    private boolean hasIfClauses() {
-        return onlyIfClausesMapper != null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
index b68138a..8ce651e 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
@@ -20,6 +20,7 @@ package org.apache.storm.cassandra.query.selector;
 
 import backtype.storm.tuple.ITuple;
 import com.datastax.driver.core.utils.UUIDs;
+import org.apache.storm.cassandra.query.Column;
 
 import java.io.Serializable;
 import java.util.Map;
@@ -41,9 +42,8 @@ public class FieldSelector implements Serializable {
         this.field = field;
     }
 
-    public void selectAndPut(ITuple t, Map<String, Object> values) {
-        values.put(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
-
+    public Column select(ITuple t) {
+        return new Column<>(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
     }
 
     /**