You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 00:45:56 UTC

[2/4] storm git commit: STORM-1369: Add MapState implementation to storm-cassandra.

STORM-1369: Add MapState implementation to storm-cassandra.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eb6c3089
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eb6c3089
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eb6c3089

Branch: refs/heads/master
Commit: eb6c308925b2a4cd89031a2a6b951976deb6262f
Parents: b7fe860
Author: Magnus Koch <ma...@ef.com>
Authored: Mon Mar 6 11:39:34 2017 +0100
Committer: Magnus Koch <ma...@ef.com>
Committed: Mon Mar 6 11:39:34 2017 +0100

----------------------------------------------------------------------
 external/storm-cassandra/README.md              | 159 +++++++++---
 external/storm-cassandra/pom.xml                |   9 +
 .../storm/cassandra/executor/AsyncExecutor.java | 167 ++++++++++++-
 .../executor/AsyncExecutorProvider.java         |   2 +-
 .../executor/AsyncResultSetHandler.java         |  58 +++++
 .../query/AyncCQLResultSetValuesMapper.java     |  36 +++
 .../trident/state/CassandraBackingMap.java      | 241 +++++++++++++++++++
 .../trident/state/CassandraMapStateFactory.java | 106 ++++++++
 .../trident/state/MapStateFactoryBuilder.java   | 226 +++++++++++++++++
 .../state/NonTransactionalTupleStateMapper.java |  64 +++++
 .../trident/state/OpaqueTupleStateMapper.java   | 127 ++++++++++
 .../trident/state/SerializedStateMapper.java    |  67 ++++++
 .../trident/state/SimpleStateMapper.java        | 104 ++++++++
 .../cassandra/trident/state/SimpleTuple.java    | 213 ++++++++++++++++
 .../cassandra/trident/state/StateMapper.java    |  35 +++
 .../state/TransactionalTupleStateMapper.java    | 105 ++++++++
 .../TridentAyncCQLResultSetValuesMapper.java    | 117 +++++++++
 .../testtools/EmbeddedCassandraResource.java    | 193 +++++++++++++++
 .../storm/cassandra/trident/MapStateTest.java   | 230 ++++++++++++++++++
 .../src/test/resources/cassandra.yaml           |  39 +++
 20 files changed, 2261 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 1edf708..9f543c9 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -3,13 +3,13 @@ Storm Cassandra Integration (CQL).
 
 [Apache Storm](https://storm.apache.org/) is a free and open source distributed realtime computation system.
 
-### Bolt API implementation for Apache Cassandra
+## Bolt API implementation for Apache Cassandra
 
 This library provides core storm bolt on top of Apache Cassandra.
 Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.
 
 
-### Configuration
+## Configuration
 The following properties may be passed to storm configuration.
 
 | **Property name**                            | **Description** | **Default**         |
@@ -25,17 +25,17 @@ The following properties may be passed to storm configuration.
 | **cassandra.reconnectionPolicy.baseDelayMs** | -               | 100 (ms)            |
 | **cassandra.reconnectionPolicy.maxDelayMs**  | -               | 60000 (ms)          |
 
-### CassandraWriterBolt
+## CassandraWriterBolt
 
-####Static import
+###Static import
 ```java
 
 import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 
 ```
 
-#### Insert Query Builder
-##### Insert query including only the specified tuple fields.
+### Insert Query Builder
+#### Insert query including only the specified tuple fields.
 ```java
 
     new CassandraWriterBolt(
@@ -48,7 +48,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query including all tuple fields.
+#### Insert query including all tuple fields.
 ```java
 
     new CassandraWriterBolt(
@@ -59,7 +59,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert multiple queries from one input tuple.
+#### Insert multiple queries from one input tuple.
 ```java
 
     new CassandraWriterBolt(
@@ -70,7 +70,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query using QueryBuilder
+#### Insert query using QueryBuilder
 ```java
 
     new CassandraWriterBolt(
@@ -81,7 +81,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     )
 ```
 
-##### Insert query with static bound query
+#### Insert query with static bound query
 ```java
 
     new CassandraWriterBolt(
@@ -92,7 +92,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query with static bound query using named setters and aliases
+#### Insert query with static bound query using named setters and aliases
 ```java
 
     new CassandraWriterBolt(
@@ -109,7 +109,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query with bound statement load from storm configuration
+#### Insert query with bound statement load from storm configuration
 ```java
 
     new CassandraWriterBolt(
@@ -117,7 +117,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
             .bind(all());
 ```
 
-##### Insert query with bound statement load from tuple field
+#### Insert query with bound statement load from tuple field
 ```java
 
     new CassandraWriterBolt(
@@ -125,7 +125,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
             .bind(all());
 ```
 
-##### Insert query with batch statement
+#### Insert query with batch statement
 ```java
 
     // Logged
@@ -202,34 +202,127 @@ builder.setBolt("BOLT_WRITER", bolt, 4)
         .customGrouping("spout", new Murmur3StreamGrouping("title"))
 ```
 
-### Trident API support
-storm-cassandra support Trident `state` API for `inserting` data into Cassandra. 
+## Trident State Support
+
+For a state factory which writes output to Cassandra, use ```CassandraStateFactory``` with an ```INSERT INTO``` statement:
+
 ```java
-        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+
+        // Build state
         CQLStatementTupleMapper insertTemperatureValues = boundQuery(
                 "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
-                .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
-        options.withCQLStatementTupleMapper(insertTemperatureValues);
+                .bind(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))
+                .build();
+
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext())
+                .withCQLStatementTupleMapper(insertTemperatureValues);
+
         CassandraStateFactory insertValuesStateFactory =  new CassandraStateFactory(options);
-        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
-        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
-        stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
-        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+        
+        // Use state in existing stream
+        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater());
+
+```
+
+For a state factory which can query Cassandra, use ```CassandraStateFactory``` with a ```SELECT``` statment:
+
+```java
+
+        // Build state
+        CQLStatementTupleMapper selectStationName = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+                .bind(field("weather_station_id").as("id"))
+                .build();
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext())
+                .withCQLStatementTupleMapper(selectStationName)
+                .withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+        CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options);
+        
+        // Append query to existing stream
+        stream.stateQuery(selectWeatherStationStateFactory, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+
+```
+
+## Trident MapState Support
+
+For a MapState with Cassandra IBackingMap, the simplest option is to use a ```MapStateBuilder``` which generates CQL statements automatically. 
+The builder supports opaque, transactional and non-transactional map states.
+
+To store values in Cassandra you need to provide a ```StateMapper``` that maps the value to fields.  
+
+For simple values, the ```SimpleStateMapper``` can be used:
+
+```java
+        StateFactory mapState = MapStateFactoryBuilder.opaque()
+                .withTable("mykeyspace", "year_month_state")
+                .withKeys("year", "month")
+                .withStateMapper(SimpleStateMapper.opqaue("txid", "sum", "prevSum"))
+                .build();
 ```
 
-Below `state` API for `querying` data from Cassandra.
+For complex values you can either custom build a state mapper, or use binary serialization:
+
+```java
+        StateFactory mapState = MapStateFactoryBuilder.opaque()
+                .withTable("mykeyspace", "year_month_state")
+                .withKeys("year", "month")
+                .withJSONBinaryState("state")
+                .build();
+```
+
+The JSONBinary methods use the storm JSON serializers, but you can also provide custom serializers if you want.
+
+For instance, the ```NonTransactionalTupleStateMapper```, ```TransactionalTupleStateMapper``` or ```OpaqueTupleStateMapper```
+classes can be used if the map state uses tuples as values.
+
 ```java
-        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
-        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
-                 .bind(with(field("weather_station_id").as("id")));
-        options.withCQLStatementTupleMapper(insertTemperatureValues);
-        options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
-        CassandraStateFactory selectWeatherStationStateFactory =  new CassandraStateFactory(options);
-        CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
-        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
-        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));         
+        StateFactory mapState = MapStateFactoryBuilder.<ITuple>nontransactional()
+                .withTable("mykeyspace", "year_month_state")
+                .withKeys("year", "month")
+                .withStateMapper(new NonTransactionalTupleStateMapper("latest_value"))
+                .build();
 ```
 
+Alternatively, you can construct a ```CassandraMapStateFactory``` yourself:
+
+```java
+
+        CQLStatementTupleMapper get = simpleQuery("SELECT state FROM words_ks.words_table WHERE word = ?")
+                .with(fields("word"))
+                .build();
+
+        CQLStatementTupleMapper put = simpleQuery("INSERT INTO words_ks.words_table (word, state) VALUES (?, ?)")
+                .with(fields("word", "state"))
+                .build();
+
+        CassandraBackingMap.Options<Integer> mapStateOptions = new CassandraBackingMap.Options<Integer>(new CassandraContext())
+                .withBatching(BatchStatement.Type.UNLOGGED)
+                .withKeys(new Fields("word"))
+                .withNonTransactionalJSONBinaryState("state")
+                .withMultiGetCQLStatementMapper(get)
+                .withMultiPutCQLStatementMapper(put);
+
+        CassandraMapStateFactory factory = CassandraMapStateFactory.nonTransactional(mapStateOptions)
+                .withCache(0);
+
+```
+
+### MapState Parallelism
+
+The backing map implementation submits queries (gets and puts) in parallel to the Cassandra cluster.
+The default number of parallel requests based on the driver configuration, which ends up being 128 with
+default driver configuration. The maximum parallelism applies to the cluster as a whole, and to each 
+state instance (per worker, not executor).
+
+The default calculation is:
+  default = min(max local, max remote) / 2
+  
+which normally means:
+  min(1024, 256) / 2 = 128
+
+This is deliberately conservative to avoid issues in most setups. If this does not provide sufficient 
+throughput you can either explicitly override the max parallelism on the state builder/factory/backingmap, 
+or you can update the driver configuration.
+
 ## License
 
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 0eff9f6..ed7ccf4 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -38,6 +38,7 @@
         <guava.version>16.0.1</guava.version>
         <commons-lang3.version>3.3</commons-lang3.version>
         <cassandra.driver.core.version>3.1.2</cassandra.driver.core.version>
+        <cassandra.version>2.1.7</cassandra.version>
     </properties>
 
     <developers>
@@ -70,6 +71,13 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.cassandra</groupId>
+            <artifactId>cassandra-all</artifactId>
+            <version>${cassandra.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>
@@ -100,5 +108,6 @@
             <version>1.10.19</version>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
index 5366c81..63b81fe 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
@@ -22,17 +22,22 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
-import com.google.common.util.concurrent.*;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.storm.topology.FailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -106,6 +111,7 @@ public class AsyncExecutor<T> implements Serializable {
     public SettableFuture<T> execAsync(final Statement statement, final T inputs) {
         return execAsync(statement, inputs, handler);
     }
+
     /**
      * Asynchronously executes the specified batch statement. Inputs will be passed to
      * the {@link #handler} once query succeed or failed.
@@ -138,6 +144,117 @@ public class AsyncExecutor<T> implements Serializable {
     }
 
     /**
+     * Asynchronously executes the specified select statements. Results will be passed to the {@link AsyncResultSetHandler}
+     * once each query has succeed or failed.
+     */
+    public SettableFuture<List<T>> execAsync(final List<Statement> statements, final List<T> inputs, Semaphore throttle, final AsyncResultSetHandler<T> handler) {
+
+        final SettableFuture<List<T>> settableFuture = SettableFuture.create();
+        if (inputs.size() == 0) {
+            settableFuture.set(new ArrayList<T>());
+            return settableFuture;
+        }
+
+        final AsyncContext<T> asyncContext = new AsyncContext<>(inputs, throttle, settableFuture);
+        for (int i = 0; i < statements.size(); i++) {
+
+            // Acquire a slot
+            if (asyncContext.acquire()) {
+                try {
+                    pending.incrementAndGet();
+                    final T input = inputs.get(i);
+                    final Statement statement = statements.get(i);
+                    ResultSetFuture future = session.executeAsync(statement);
+                    Futures.addCallback(future, new FutureCallback<ResultSet>() {
+                        @Override
+                        public void onSuccess(ResultSet result) {
+                            try {
+                                handler.success(input, result);
+                            } catch (Throwable throwable) {
+                                asyncContext.exception(throwable);
+                            } finally {
+                                pending.decrementAndGet();
+                                asyncContext.release();
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Throwable throwable) {
+                            try {
+                                handler.failure(throwable, input);
+                            } catch (Throwable throwable2) {
+                                asyncContext.exception(throwable2);
+                            }
+                            finally {
+                                asyncContext
+                                        .exception(throwable)
+                                        .release();
+                                pending.decrementAndGet();
+                                LOG.error(String.format("Failed to execute statement '%s' ", statement), throwable);
+                            }
+                        }
+                    }, executorService);
+                } catch (Throwable throwable) {
+                    asyncContext.exception(throwable)
+                            .release();
+                    pending.decrementAndGet();
+                    break;
+                }
+            }
+
+        }
+
+        return settableFuture;
+    }
+
+    private static class AsyncContext<T> {
+        private final List<T> inputs;
+        private final SettableFuture<List<T>> future;
+        private final  AtomicInteger latch;
+        private final  List<Throwable> exceptions;
+        private final  Semaphore throttle;
+
+        public AsyncContext(List<T> inputs, Semaphore throttle, SettableFuture<List<T>> settableFuture) {
+            this.inputs = inputs;
+            this.latch = new AtomicInteger(inputs.size());
+            this.throttle = throttle;
+            this.exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
+            this.future = settableFuture;
+        }
+
+        public boolean acquire() {
+            throttle.acquireUninterruptibly();
+            // Don't start new requests if there is an exception
+            if (exceptions.size() > 0) {
+                latch.decrementAndGet();
+                throttle.release();
+                return false;
+            }
+            return true;
+        }
+
+        public AsyncContext release() {
+            int remaining = latch.decrementAndGet();
+            if (remaining == 0) {
+                if (exceptions.size() == 0) {
+                    future.set(inputs);
+                }
+                else {
+                    future.setException(new MultiFailedException(exceptions));
+                }
+
+            }
+            throttle.release();
+            return this;
+        }
+
+        public AsyncContext exception(Throwable throwable) {
+            this.exceptions.add(throwable);
+            return this;
+        }
+    }
+
+    /**
      * Returns the number of currently executed tasks which are not yet completed.
      */
     public int getPendingTasksSize() {
@@ -150,4 +267,48 @@ public class AsyncExecutor<T> implements Serializable {
             this.executorService.shutdownNow();
         }
     }
+
+    public static class MultiFailedException extends FailedException {
+        private final List<Throwable> exceptions;
+
+        public MultiFailedException(List<Throwable> exceptions) {
+            super(getMessage(exceptions), exceptions.get(0));
+            this.exceptions = exceptions;
+        }
+
+        private static String getMessage(List<Throwable> exceptions) {
+            int top5 = Math.min(exceptions.size(), 5);
+            StringBuilder sb = new StringBuilder();
+            sb.append("First ")
+                    .append(top5)
+                    .append(" exceptions: ")
+                    .append(System.lineSeparator());
+            for (int i = 0; i < top5; i++) {
+                sb.append(exceptions.get(i).getMessage())
+                        .append(System.lineSeparator());
+            }
+            return sb.toString();
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append(getMessage())
+                    .append(System.lineSeparator())
+                    .append("Multiple exceptions encountered: ")
+                    .append(System.lineSeparator());
+
+            for (Throwable exception : exceptions) {
+                sb.append(exception.toString())
+                        .append(System.lineSeparator());
+            }
+
+            return super.toString();
+        }
+
+        public List<Throwable> getExceptions() {
+            return exceptions;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
index 0c684c0..f4b7277 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
@@ -31,7 +31,7 @@ public class AsyncExecutorProvider {
      * Returns a new {@link AsyncExecutor} per storm executor.
      */
     public static <T> AsyncExecutor getLocal(Session session, AsyncResultHandler<T> handler) {
-        AsyncExecutor<T> executor = localAsyncExecutor.get();
+        AsyncExecutor<T> executor = localAsyncExecutor.<T>get();
         if( executor == null ) {
             localAsyncExecutor.set(executor = new AsyncExecutor<>(session, handler));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
new file mode 100644
index 0000000..8ccb400
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+
+import java.io.Serializable;
+
+/**
+ * Default handler for batch asynchronous execution.
+ */
+public interface AsyncResultSetHandler<T> extends Serializable {
+
+    public static final AsyncResultSetHandler NO_OP_HANDLER = new AsyncResultSetHandler() {
+        @Override
+        public void failure(Throwable t, Object inputs) {
+            /** no-operation **/
+        }
+
+        @Override
+        public void success(Object inputs, ResultSet resultSet) {
+            /** no-operation **/
+        }
+
+    };
+
+    /**
+     * This method is responsible for failing specified inputs.
+     *
+     * @param t The cause the failure.
+     * @param inputs The input tuple proceed.
+     */
+    void failure(Throwable t, T inputs);
+
+    /**
+     * This method is responsible for acknowledging specified inputs.
+     *
+     * @param inputs The input tuple proceed.
+     */
+    void success(T inputs, ResultSet resultSet) ;
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
new file mode 100644
index 0000000..9b92b99
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A resultset mapper that
+ */
+public interface AyncCQLResultSetValuesMapper extends Serializable {
+
+    List<List<Values>> map(Session session, List<Statement> statements, List<ITuple> tuples);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
new file mode 100644
index 0000000..82f3b9c
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
+import org.apache.storm.trident.state.JSONOpaqueSerializer;
+import org.apache.storm.trident.state.JSONTransactionalSerializer;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.IBackingMap;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+/**
+ * An IBackingState implementation for Cassandra.
+ *
+ * The implementation stores state as a binary blob in cassandra using a {@link Serializer}.
+ * It supports Opaque, Transactional and NonTransactional states, given a matching serializer.
+ *
+ * Configuration is done with three separate constructs:
+ *  - One tuple mapper for multiGet, which should map keys to a select statement and return {@link Values}.
+ *  - One state mapper, which maps the state to/from a {@link Values} representation, which is used for binding.
+ *  - One tuple mapper for multiPut, which should map {@link Values} to an INSERT or UPDATE statement.
+ *
+ * {@link #multiPut(List, List)} updates Cassandra with parallel statements.
+ * {@link #multiGet(List)} queries Cassandra with parallel statements.
+ *
+ * Parallelism defaults to half the maximum requests per host, either local or remote whichever is
+ * lower. The driver defaults to 256 for remote hosts and 1024 for local hosts, so the default value is 128
+ * unless the driver is configured otherwise.
+ *
+ * @param <T>
+ */
+public class CassandraBackingMap<T> implements IBackingMap<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraBackingMap.class);
+
+    private final Map conf;
+    private final Options<T> options;
+    private final Fields allFields;
+
+    private SimpleClient client;
+    private Session session;
+    private AyncCQLResultSetValuesMapper getResultMapper;
+    private AyncCQLResultSetValuesMapper putResultMapper;
+    private Semaphore throttle;
+
+
+    protected CassandraBackingMap(Map conf, Options<T> options) {
+        this.conf = conf;
+        this.options = options;
+        List<String> allFields = options.keyFields.toList();
+        allFields.addAll(options.stateMapper.getStateFields().toList());
+        this.allFields = new Fields(allFields);
+    }
+
+    public void prepare() {
+        LOG.info("Preparing state for {}", options.toString());
+        Preconditions.checkNotNull(options.getMapper, "CassandraBackingMap.Options should have getMapper");
+        Preconditions.checkNotNull(options.putMapper, "CassandraBackingMap.Options should have putMapper");
+        client = options.clientProvider.getClient(conf);
+        session = client.connect();
+        if (options.maxParallelism == null || options.maxParallelism <= 0) {
+            PoolingOptions po = session.getCluster().getConfiguration().getPoolingOptions();
+            Integer maxRequestsPerHost = Math.min(
+                    po.getMaxConnectionsPerHost(HostDistance.LOCAL) * po.getMaxRequestsPerConnection(HostDistance.LOCAL),
+                    po.getMaxConnectionsPerHost(HostDistance.REMOTE) * po.getMaxRequestsPerConnection(HostDistance.REMOTE)
+            );
+            options.maxParallelism = maxRequestsPerHost / 2;
+            LOG.info("Parallelism default set to {}", options.maxParallelism);
+        }
+        throttle = new Semaphore(options.maxParallelism, false);
+        this.getResultMapper = new TridentAyncCQLResultSetValuesMapper(options.stateMapper.getStateFields(), throttle);
+        this.putResultMapper = new TridentAyncCQLResultSetValuesMapper(null, throttle);
+    }
+
+    @Override
+    public List<T> multiGet(List<List<Object>> keys) {
+        LOG.debug("multiGet fetching {} values.", keys.size());
+        List<Statement> selects = new ArrayList<>();
+        List<ITuple> keyTuples = new ArrayList<>();
+
+        for (int i = 0; i < keys.size(); i++) {
+            SimpleTuple keyTuple = new SimpleTuple(options.keyFields, keys.get(i));
+            List<Statement> mappedStatements = options.getMapper.map(conf, session, keyTuple);
+            if (mappedStatements.size() > 1) {
+                throw new IllegalArgumentException("Only one statement per map state item is supported.");
+            }
+            selects.add(mappedStatements.size() == 1 ? mappedStatements.get(0) : null);
+            keyTuples.add(keyTuple);
+        }
+
+        List<List<Values>> results = getResultMapper
+                .map(session, selects, keyTuples);
+
+        List<T> states = new ArrayList<>();
+        for (List<Values> values : results) {
+            T state = (T) options.stateMapper.fromValues(values);
+            states.add(state);
+        }
+
+        return states;
+
+    }
+
+    @Override
+    public void multiPut(List<List<Object>> keys, List<T> values) {
+        LOG.debug("multiPut writing {} values.", keys.size());
+
+        List<Statement> statements = new ArrayList<>();
+        for (int i = 0; i < keys.size(); i++) {
+            Values stateValues = options.stateMapper.toValues(values.get(i));
+            SimpleTuple tuple = new SimpleTuple(allFields, keys.get(i), stateValues);
+            statements.addAll(options.putMapper.map(conf, session, tuple));
+        }
+
+        try {
+            putResultMapper.map(session, statements, null);
+        } catch (Exception e) {
+            LOG.warn("Write operation failed: {}", e.getMessage());
+            throw new FailedException(e);
+        }
+    }
+
+    public static final class Options<T> implements Serializable {
+        private final SimpleClientProvider clientProvider;
+        private Fields keyFields;
+        private StateMapper stateMapper;
+        private CQLStatementTupleMapper getMapper;
+        private CQLStatementTupleMapper putMapper;
+        private Integer maxParallelism = 128;
+
+        public Options(SimpleClientProvider clientProvider) {
+            this.clientProvider = clientProvider;
+        }
+
+        public Options<T> withKeys(Fields keyFields) {
+            this.keyFields = keyFields;
+            return this;
+        }
+
+        public Options<T> withStateMapper(StateMapper<T> stateMapper) {
+            this.stateMapper = stateMapper;
+            return this;
+        }
+
+        public Options<T> withNonTransactionalJSONBinaryState(String fieldName) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONNonTransactionalSerializer());
+            return this;
+        }
+
+        public Options<T> withNonTransactionalBinaryState(String fieldName, Serializer<T> serializer) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+            return this;
+        }
+
+        public Options<T> withTransactionalJSONBinaryState(String fieldName) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONTransactionalSerializer());
+            return this;
+        }
+
+        public Options<T> withTransactionalBinaryState(String fieldName, Serializer<TransactionalValue<T>> serializer) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+            return this;
+        }
+
+        public Options<T> withOpaqueJSONBinaryState(String fieldName) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONOpaqueSerializer());
+            return this;
+        }
+
+        public Options<T> withOpaqueBinaryState(String fieldName, Serializer<OpaqueValue<T>> serializer) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+            return this;
+        }
+
+        public Options<T> withGetMapper(CQLStatementTupleMapper getMapper) {
+            this.getMapper = getMapper;
+            return this;
+        }
+
+        public Options<T> withPutMapper(CQLStatementTupleMapper putMapper) {
+            this.putMapper = putMapper;
+            return this;
+        }
+
+        public Options<T> withMaxParallelism(Integer maxParallelism) {
+            this.maxParallelism = maxParallelism;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s: [keys: %s, StateMapper: %s, getMapper: %s, putMapper: %s, maxParallelism: %d",
+                    this.getClass().getSimpleName(),
+                    keyFields,
+                    stateMapper,
+                    getMapper,
+                    putMapper,
+                    maxParallelism
+            );
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
new file mode 100644
index 0000000..abd9477
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.CachedMap;
+import org.apache.storm.trident.state.map.IBackingMap;
+import org.apache.storm.trident.state.map.MapState;
+import org.apache.storm.trident.state.map.NonTransactionalMap;
+import org.apache.storm.trident.state.map.OpaqueMap;
+import org.apache.storm.trident.state.map.TransactionalMap;
+
+import java.util.Map;
+
+/**
+ * A StateFactory implementation that creates a MapState backed by CassandraBackingMap.
+ *
+ * The statefactory supports opaque, transactional and non-transactional configurations.
+ * Optionally, the backing map can be wrapped in a {@link CachedMap} by specifying {@link #withCache} (off by default).
+ *
+ */
+public class CassandraMapStateFactory implements StateFactory {
+
+    private final StateType stateType;
+    private final CassandraBackingMap.Options options;
+    private int cacheSize;
+    private Map cassandraConfig;
+
+    private CassandraMapStateFactory(StateType stateType, CassandraBackingMap.Options options, Map cassandraConfig) {
+        this.stateType = stateType;
+        this.options = options;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public static CassandraMapStateFactory opaque(CassandraBackingMap.Options options, Map cassandraConfig) {
+        return new CassandraMapStateFactory(StateType.OPAQUE, options, cassandraConfig);
+    }
+
+    public static CassandraMapStateFactory transactional(CassandraBackingMap.Options options, Map cassandraConfig) {
+        return new CassandraMapStateFactory(StateType.TRANSACTIONAL, options, cassandraConfig);
+    }
+
+    public static CassandraMapStateFactory nonTransactional(CassandraBackingMap.Options options, Map cassandraConfig) {
+        return new CassandraMapStateFactory(StateType.NON_TRANSACTIONAL, options, cassandraConfig);
+    }
+
+    public CassandraMapStateFactory withCache(int cacheSize) {
+        this.cacheSize = cacheSize;
+        return this;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+
+        CassandraBackingMap cassandraBackingMap = new CassandraBackingMap(cassandraConfig, options);
+        cassandraBackingMap.prepare();
+
+        IBackingMap backingMap = cacheSize > 0
+                ? new CachedMap<>(cassandraBackingMap, cacheSize)
+                : cassandraBackingMap;
+
+        MapState<?> mapState;
+
+        switch (stateType) {
+            case OPAQUE:
+                mapState = OpaqueMap.build((IBackingMap<OpaqueValue>) backingMap);
+                break;
+
+            case TRANSACTIONAL:
+                mapState = TransactionalMap.build((IBackingMap<TransactionalValue>)backingMap);
+                break;
+
+            case NON_TRANSACTIONAL:
+                mapState = NonTransactionalMap.build(backingMap);
+                break;
+
+            default:
+                throw new IllegalArgumentException("Invalid state provided " + stateType);
+        }
+
+        return mapState;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
new file mode 100644
index 0000000..c371fdb
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.Select;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
+import org.apache.storm.trident.state.JSONOpaqueSerializer;
+import org.apache.storm.trident.state.JSONTransactionalSerializer;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+
+/**
+ * A helper for building a MapState backed by Cassandra. It internalizes some common
+ * implementation choices to simplify usage.
+ *
+ * In the simplest use case, a map state can be constructed with:
+ *
+ * StateFactory mapState = MapStateFactoryBuilder.opaque()
+ *     .withTable("mykeyspace", "year_month_state")
+ *     .withKeys("year", "month")
+ *     .withJSONBinaryState("state")
+ *     .build();
+ *
+ * for a cassandra table with:
+ * mykeyspace.year_month_state {
+ *     year: int,
+ *     month: int,
+ *     state: blob
+ * }
+ *
+ * This will use the storm JSON serializers to convert the state to and from binary format.
+ * Other binary serializers can be used with the {@link #withBinaryState(String, Serializer)} method.
+ *
+ * Storing state in explicit fields (e.g. in a field "sum" of type int) is possible by instead calling
+ * {@link #withStateMapper(StateMapper)}. For instance, you can use {@link NonTransactionalTupleStateMapper},
+ * {@link TransactionalTupleStateMapper} or {@link OpaqueTupleStateMapper} if your state values are tuples.
+ *
+ */
+public class MapStateFactoryBuilder<T> {
+
+    private static final Logger logger = LoggerFactory.getLogger(MapStateFactoryBuilder.class);
+
+    private String keyspace;
+    private String table;
+    private String[] keys;
+    private Integer maxParallelism;
+    private StateType stateType;
+    private StateMapper<T> stateMapper;
+    private Map cassandraConfig;
+    private int cacheSize;
+
+    public static <U> MapStateFactoryBuilder<OpaqueValue<U>> opaque(Map cassandraConf) {
+        return new MapStateFactoryBuilder<OpaqueValue<U>>()
+                .withStateType(StateType.OPAQUE)
+                .withCassandraConfig(cassandraConf);
+    }
+
+    public static <U> MapStateFactoryBuilder<TransactionalValue<U>> transactional(Map cassandraConf) {
+        return new MapStateFactoryBuilder<TransactionalValue<U>>()
+                .withStateType(StateType.TRANSACTIONAL)
+                .withCassandraConfig(cassandraConf);
+    }
+
+    public static <U> MapStateFactoryBuilder<U> nontransactional(Map cassandraConf) {
+        return new MapStateFactoryBuilder<U>()
+                .withStateType(StateType.NON_TRANSACTIONAL)
+                .withCassandraConfig(cassandraConf);
+    }
+
+    public MapStateFactoryBuilder<T> withTable(String keyspace, String table) {
+        this.keyspace = keyspace;
+        this.table = table;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withKeys(String... keys) {
+        this.keys = keys;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withMaxParallelism(Integer maxParallelism) {
+        this.maxParallelism = maxParallelism;
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public MapStateFactoryBuilder<T> withJSONBinaryState(String stateField) {
+        switch (stateType) {
+            case OPAQUE:
+                return withBinaryState(stateField, (Serializer) new JSONOpaqueSerializer());
+            case TRANSACTIONAL:
+                return withBinaryState(stateField, (Serializer) new JSONTransactionalSerializer());
+            case NON_TRANSACTIONAL:
+                return withBinaryState(stateField, new JSONNonTransactionalSerializer());
+            default:
+                throw new IllegalArgumentException("State type " + stateType + " is unknown.");
+        }
+    }
+
+    public MapStateFactoryBuilder<T> withStateMapper(StateMapper<T> stateMapper) {
+        this.stateMapper = stateMapper;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withBinaryState(String stateField, Serializer<T> serializer) {
+        return withStateMapper(new SerializedStateMapper<>(stateField, serializer));
+    }
+
+    protected MapStateFactoryBuilder<T> withStateType(StateType stateType) {
+        this.stateType = stateType;
+        return this;
+    }
+
+    protected MapStateFactoryBuilder<T> withCassandraConfig(Map cassandraConf) {
+        this.cassandraConfig = cassandraConf;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withCache(int cacheSize) {
+        this.cacheSize = cacheSize;
+        return this;
+    }
+
+    public StateFactory build() {
+
+        Objects.requireNonNull(keyspace, "A keyspace is required.");
+        Objects.requireNonNull(table, "A table name is required.");
+        Objects.requireNonNull(keys, "At least one key must be specified.");
+        if (keys.length == 0) {
+            throw new IllegalArgumentException("At least one key must be specified.");
+        }
+        Objects.requireNonNull(stateMapper, "A state mapper must be specified.");
+        Objects.requireNonNull(stateType, "A state type must be specified.");
+
+        List<String> stateFields = stateMapper.getStateFields()
+                .toList();
+
+        String[] stateFieldsArray = stateFields.toArray(new String[stateFields.size()]);
+
+        List<String> allFields = new ArrayList<>();
+        Collections.addAll(allFields, keys);
+        allFields.addAll(stateFields);
+
+        // Build get query
+        Select.Where getQuery = select(stateFieldsArray)
+                .from(keyspace, table)
+                .where();
+
+        for (String key : keys) {
+            getQuery.and(eq(key, bindMarker()));
+        }
+
+        CQLStatementTupleMapper get = boundQuery(getQuery.toString())
+                .bind(all())
+                .build();
+
+        // Build put query
+        Insert putStatement = insertInto(keyspace, table)
+                .values(allFields, Collections.<Object>nCopies(allFields.size(), bindMarker()));
+
+        CQLStatementTupleMapper put = boundQuery(putStatement.toString())
+                .bind(all())
+                .build();
+
+        CassandraBackingMap.Options options = new CassandraBackingMap.Options<T>(new CassandraContext())
+                .withGetMapper(get)
+                .withPutMapper(put)
+                .withStateMapper(stateMapper)
+                .withKeys(new Fields(keys))
+                .withMaxParallelism(maxParallelism);
+
+        logger.debug("Building factory with: \n  get: {}\n  put: {}\n  mapper: {}",
+                getQuery.toString(),
+                putStatement.toString(),
+                stateMapper.toString());
+
+        switch (stateType) {
+            case NON_TRANSACTIONAL:
+                return CassandraMapStateFactory.nonTransactional(options, cassandraConfig)
+                        .withCache(cacheSize);
+            case TRANSACTIONAL:
+                return CassandraMapStateFactory.transactional(options, cassandraConfig)
+                        .withCache(cacheSize);
+            case OPAQUE:
+                return CassandraMapStateFactory.opaque(options, cassandraConfig)
+                        .withCache(cacheSize);
+        }
+
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
new file mode 100644
index 0000000..3a36b07
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+/**
+ * State mapper that maps a tuple to separate state fields.
+ */
+public class NonTransactionalTupleStateMapper implements StateMapper<ITuple> {
+
+    private final Fields fields;
+
+    public NonTransactionalTupleStateMapper(String... fields) {
+        this.fields = new Fields(fields);
+    }
+
+    public NonTransactionalTupleStateMapper(Fields fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return fields;
+    }
+
+    @Override
+    public Values toValues(ITuple t) {
+        return new Values(t.getValues());
+    }
+
+    @Override
+    public ITuple fromValues(List<Values> values) {
+        if (values == null || values.size() == 0) {
+            return null;
+        }
+        return new SimpleTuple(fields, values.get(0));
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), fields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
new file mode 100644
index 0000000..882c9b1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * State mapper that maps an opaque tuple to separate state fields.
+ */
+public class OpaqueTupleStateMapper implements StateMapper<OpaqueValue<ITuple>> {
+
+    private final Fields tupleFields;
+    private final Fields tableFields;
+
+    public OpaqueTupleStateMapper(String currTxIdField, String currPrefix, String prevPrefix, String... fields) {
+        this(currTxIdField, currPrefix, prevPrefix, new Fields(fields));
+    }
+
+    public OpaqueTupleStateMapper(String currTxIdField, String currPrefix, String prevPrefix, Fields fields) {
+        tupleFields = fields;
+        ArrayList<String> fieldList = new ArrayList<>();
+        fieldList.add(currTxIdField);
+        for (String field : fields) {
+            fieldList.add(currPrefix + field);
+        }
+        for (String field : fields) {
+            fieldList.add(prevPrefix + field);
+        }
+        tableFields = new Fields(fieldList);
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return tableFields;
+    }
+
+    @Override
+    public Values toValues(OpaqueValue<ITuple> tuple) {
+        Values values = new Values();
+        values.add(tuple.getCurrTxid());
+
+        for (String valueField : tupleFields) {
+            if (tuple.getCurr() != null) {
+                values.add(tuple.getCurr().getValueByField(valueField));
+            }
+            else {
+                values.add(null);
+            }
+        }
+
+        for (String valueField : tupleFields) {
+            if (tuple.getPrev() != null) {
+                values.add(tuple.getPrev().getValueByField(valueField));
+            }
+            else {
+                values.add(null);
+            }
+        }
+
+        return values;
+    }
+
+    @Override
+    public OpaqueValue<ITuple> fromValues(List<Values> valuesList) {
+        if (valuesList == null || valuesList.size() == 0) {
+            return null;
+        }
+        Values values = valuesList.get(0);
+        int index = 0;
+        Long currTx = (Long) values.get(index++);
+
+        SimpleTuple curr = new SimpleTuple(tupleFields);
+        for (String valueField : tupleFields) {
+            curr.put(valueField, values.get(index++));
+        }
+
+        if (isAllNull(curr)) {
+            curr = null;
+        }
+
+        SimpleTuple prev = new SimpleTuple(tupleFields);
+        for (String valueField : tupleFields) {
+            prev.put(valueField, values.get(index++));
+        }
+        if (isAllNull(prev)) {
+            prev = null;
+        }
+
+        return new OpaqueValue<ITuple>(currTx, curr, prev);
+    }
+
+    private boolean isAllNull(SimpleTuple tuple) {
+        for (Object value : tuple.getValues()) {
+            if (value != null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), tableFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
new file mode 100644
index 0000000..b4ec6c8
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SerializedStateMapper<T> implements StateMapper<T> {
+
+    private final Fields stateFields;
+    private final Serializer<T> serializer;
+
+    public SerializedStateMapper(String fieldName, Serializer<T> serializer) {
+        this.stateFields = new Fields(fieldName);
+        this.serializer = serializer;
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return stateFields;
+    }
+
+    @Override
+    public Values toValues(T value) {
+        byte[] serialized = serializer.serialize(value);
+        return new Values(ByteBuffer.wrap(serialized));
+    }
+
+    @Override
+    public T fromValues(List<Values> values) {
+        if (values.size() == 0) {
+            return null;
+        }
+        else if (values.size() == 1) {
+            ByteBuffer bytes = (ByteBuffer) values.get(0).get(0);
+            return serializer.deserialize(bytes.array());
+        }
+        else {
+            throw new IllegalArgumentException("Can only convert single values, " + values.size() + " encountered");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s, serializer: %s}", this.getClass().getSimpleName(), stateFields, serializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
new file mode 100644
index 0000000..cc03a09
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class SimpleStateMapper<T> implements StateMapper<T> {
+
+    private final Fields fields;
+    private final StateType stateType;
+
+    public SimpleStateMapper(Fields fields, StateType stateType) {
+        this.fields = fields;
+        this.stateType = stateType;
+    }
+
+    public static <U> StateMapper<OpaqueValue<U>> opaque(String txIdField, String previousField, String field) {
+        return new SimpleStateMapper<>(new Fields(txIdField, field, previousField), StateType.OPAQUE);
+    }
+
+    public static <U> StateMapper<TransactionalValue<U>> opaque(String txIdField, String field) {
+        return new SimpleStateMapper<>(new Fields(txIdField, field), StateType.TRANSACTIONAL);
+    }
+
+    public static <U> StateMapper<U> nontransactional(String field) {
+        return new SimpleStateMapper<>(new Fields(field), StateType.NON_TRANSACTIONAL);
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return fields;
+    }
+
+    @Override
+    public Values toValues(T value) {
+        if (value == null) {
+            return null;
+        }
+        switch (stateType) {
+            case NON_TRANSACTIONAL:
+                return new Values(value);
+            case TRANSACTIONAL:
+                TransactionalValue transactional = (TransactionalValue) value;
+                return new Values(transactional.getTxid(), transactional.getVal());
+            case OPAQUE:
+                OpaqueValue opaque = (OpaqueValue) value;
+                return new Values(opaque.getCurrTxid(), opaque.getCurr(), opaque.getPrev());
+            default:
+                throw new IllegalStateException("Unknown state type " + stateType);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public T fromValues(List<Values> valuesSet) {
+        if (valuesSet == null || valuesSet.size() == 0) {
+            return null;
+        }
+        else if (valuesSet.size() == 1) {
+            Values values = valuesSet.get(0);
+            if (values == null) {
+                return null;
+            }
+            switch (stateType) {
+                case NON_TRANSACTIONAL:
+                    return (T) values.get(0);
+                case TRANSACTIONAL:
+                    return (T) new TransactionalValue((Long) values.get(0), values.get(1));
+                case OPAQUE:
+                    return (T) new OpaqueValue((Long) values.get(0), values.get(1), values.get(2));
+                default:
+                    throw new IllegalStateException("Unknown state type " + stateType);
+            }
+        }
+        throw new IllegalStateException("State query returned multiple results.");
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s, stateType: %s}", this.getClass().getSimpleName(), fields, stateType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
new file mode 100644
index 0000000..d78f6d7
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class for passing around ordered key/value data with an immutable key set.
+ */
+public class SimpleTuple implements ITuple, Serializable {
+
+    private static final long serialVersionUID = -4656331293513898312L;
+
+    private final List<String> keys;
+    private List<Object> values;
+
+    public SimpleTuple(Fields keyFields, List<Object> values) {
+        this.keys = keyFields.toList();
+        this.values = new ArrayList<>();
+        this.values.addAll(values);
+        while (this.values.size() < keys.size()) {
+            this.values.add(null);
+        }
+    }
+
+    public SimpleTuple(Fields keyFields, List<Object>... values) {
+        this.keys = keyFields.toList();
+        this.values = new ArrayList<>();
+        for (List<Object> valueList : values) {
+            this.values.addAll(valueList);
+        }
+        while (this.values.size() < keys.size()) {
+            this.values.add(null);
+        }
+    }
+
+    public SimpleTuple put(String key, Object value) {
+        int index = keys.indexOf(key);
+        if (index >= 0) {
+            values.set(index, value);
+        }
+        else {
+            throw new IllegalArgumentException("Field " + key + " does not exist.");
+        }
+        return this;
+    }
+
+    public SimpleTuple setValues(List<Object> values) {
+        this.values = new ArrayList<>(values);
+        return this;
+    }
+
+    @Override
+    public int size() {
+        return keys.size();
+    }
+
+    @Override
+    public boolean contains(String field) {
+        return keys.contains(field);
+    }
+
+    @Override
+    public Fields getFields() {
+        return new Fields(keys);
+    }
+
+    @Override
+    public int fieldIndex(String field) {
+        return keys.indexOf(field);
+    }
+
+    @Override
+    public List<Object> select(Fields selector) {
+        List<Object> values = new ArrayList<>();
+        for (String field : selector) {
+            values.add(getValueByField(field));
+        }
+        return values;
+    }
+
+    @Override
+    public Object getValue(int i) {
+        return values.get(i);
+    }
+
+    @Override
+    public String getString(int i) {
+        return (String) values.get(i);
+    }
+
+    @Override
+    public Integer getInteger(int i) {
+        return (Integer) values.get(i);
+    }
+
+    @Override
+    public Long getLong(int i) {
+        return (Long) values.get(i);
+    }
+
+    @Override
+    public Boolean getBoolean(int i) {
+        return (Boolean) values.get(i);
+    }
+
+    @Override
+    public Short getShort(int i) {
+        return (Short) values.get(i);
+    }
+
+    @Override
+    public Byte getByte(int i) {
+        return (Byte) values.get(i);
+    }
+
+    @Override
+    public Double getDouble(int i) {
+        return (Double) values.get(i);
+    }
+
+    @Override
+    public Float getFloat(int i) {
+        return (Float) values.get(i);
+    }
+
+    @Override
+    public byte[] getBinary(int i) {
+        return (byte[]) values.get(i);
+    }
+
+    @Override
+    public Object getValueByField(String field) {
+        return values.get(keys.indexOf(field));
+    }
+
+    @Override
+    public String getStringByField(String field) {
+        return (String) getValueByField(field);
+    }
+
+    @Override
+    public Integer getIntegerByField(String field) {
+        return (Integer) getValueByField(field);
+    }
+
+    @Override
+    public Long getLongByField(String field) {
+        return (Long) getValueByField(field);
+    }
+
+    @Override
+    public Boolean getBooleanByField(String field) {
+        return (Boolean) getValueByField(field);
+    }
+
+    @Override
+    public Short getShortByField(String field) {
+        return (Short) getValueByField(field);
+    }
+
+    @Override
+    public Byte getByteByField(String field) {
+        return (Byte) getValueByField(field);
+    }
+
+    @Override
+    public Double getDoubleByField(String field) {
+        return (Double) getValueByField(field);
+    }
+
+    @Override
+    public Float getFloatByField(String field) {
+        return (Float) getValueByField(field);
+    }
+
+    @Override
+    public byte[] getBinaryByField(String field) {
+        return (byte[]) getValueByField(field);
+    }
+
+    @Override
+    public List<Object> getValues() {
+        return Collections.unmodifiableList(values);
+    }
+
+    public List<String> getKeys() {
+        return Collections.unmodifiableList(keys);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
new file mode 100644
index 0000000..ef0c783
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface StateMapper<T> extends Serializable {
+
+    Fields getStateFields();
+
+    Values toValues(T value);
+
+    T fromValues(List<Values> values);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
new file mode 100644
index 0000000..83332b9
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * State mapper that maps a transactional tuple to separate state fields.
+ */
+public class TransactionalTupleStateMapper implements StateMapper<TransactionalValue<ITuple>> {
+
+    private final Fields tupleFields;
+    private final Fields tableFields;
+
+    public TransactionalTupleStateMapper(String txIdField, String... fields) {
+        this(txIdField, new Fields(fields));
+    }
+
+    public TransactionalTupleStateMapper(String txIdField, Fields fields) {
+        tupleFields = fields;
+        ArrayList<String> fieldList = new ArrayList<>();
+        fieldList.add(txIdField);
+        for (String field : fields) {
+            fieldList.add(field);
+        }
+        tableFields = new Fields(fieldList);
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return tableFields;
+    }
+
+    @Override
+    public Values toValues(TransactionalValue<ITuple> tuple) {
+        Values values = new Values();
+        values.add(tuple.getTxid());
+
+        for (String valueField : tupleFields) {
+            if (tuple.getVal() != null) {
+                values.add(tuple.getVal().getValueByField(valueField));
+            }
+            else {
+                values.add(null);
+            }
+        }
+
+        return values;
+    }
+
+    @Override
+    public TransactionalValue<ITuple> fromValues(List<Values> valuesList) {
+        if (valuesList == null || valuesList.size() == 0) {
+            return null;
+        }
+        Values values = valuesList.get(0);
+        int index = 0;
+        Long txId = (Long) values.get(index++);
+
+        SimpleTuple curr = new SimpleTuple(tupleFields);
+        for (String valueField : tupleFields) {
+            curr.put(valueField, values.get(index++));
+        }
+
+        boolean isAllNull = true;
+        for (Object value : curr.getValues()) {
+            if (value != null) {
+                isAllNull = false;
+                break;
+            }
+        }
+        if (isAllNull) {
+            curr = null;
+        }
+
+        return new TransactionalValue<ITuple>(txId, curr);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), tableFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
new file mode 100644
index 0000000..0d03fed
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.storm.cassandra.executor.AsyncExecutor;
+import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.AsyncResultSetHandler;
+import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A result set mapper implementation which runs requests in parallel and waits for them all to finish.
+ */
+public class TridentAyncCQLResultSetValuesMapper implements AyncCQLResultSetValuesMapper {
+    private final Fields outputDeclaredFields;
+    private final Semaphore throttle;
+
+    public TridentAyncCQLResultSetValuesMapper(Fields outputDeclaredFields, Semaphore throttle) {
+        this.outputDeclaredFields = outputDeclaredFields;
+        this.throttle = throttle;
+    }
+
+    @Override
+    public List<List<Values>> map(Session session, List<Statement> statements, final List<ITuple> tuples) {
+        AsyncExecutor<Integer> executor = AsyncExecutorProvider.getLocal(session, AsyncResultHandler.NO_OP_HANDLER);
+        final List<Integer> indexes = new ArrayList<>();
+        final List<List<Values>> results = new ArrayList<>();
+        for (int i = 0; i < statements.size(); i++) {
+            indexes.add(i);
+            results.add(null);
+        }
+        SettableFuture<List<Integer>> result = executor.execAsync(statements, indexes, throttle, new AsyncResultSetHandler<Integer>() {
+            @Override
+            public void success(Integer index, ResultSet resultSet) {
+                if (outputDeclaredFields != null) {
+                    List<Values> thisResult = new ArrayList<>();
+                    for (Row row : resultSet) {
+                        final Values values = new Values();
+                        for (String field : outputDeclaredFields) {
+                            ITuple tuple = tuples.get(index);
+                            if (tuple.contains(field)) {
+                                values.add(tuple.getValueByField(field));
+                            } else {
+                                values.add(row.getObject(field));
+                            }
+                        }
+                        thisResult.add(values);
+                    }
+                    results.set(index, thisResult);
+                }
+            }
+
+            @Override
+            public void failure(Throwable t, Integer index) {
+                // Exceptions are captured and thrown at the end of the batch by the executor
+            }
+
+        });
+
+        try {
+            // Await all results
+            result.get();
+        } catch (Exception e) {
+            throw new FailedException(e.getMessage(), e);
+        }
+
+        return results;
+    }
+
+    protected List<Values> handleResult(ResultSet resultSet, ITuple tuple) {
+        List<Values> list = new ArrayList<>();
+        for (Row row : resultSet) {
+            final Values values = new Values();
+            for (String field : outputDeclaredFields) {
+                if (tuple.contains(field)) {
+                    values.add(tuple.getValueByField(field));
+                } else {
+                    values.add(row.getObject(field));
+                }
+            }
+            list.add(values);
+        }
+        return list;
+    }
+
+
+
+}