You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 00:45:55 UTC

[1/4] storm git commit: STORM-1369: Add MapState implementation to storm-cassandra.

Repository: storm
Updated Branches:
  refs/heads/master b346c1c01 -> 08e7b640d


http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/testtools/EmbeddedCassandraResource.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/testtools/EmbeddedCassandraResource.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/testtools/EmbeddedCassandraResource.java
new file mode 100644
index 0000000..3b5ce5d
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/testtools/EmbeddedCassandraResource.java
@@ -0,0 +1,193 @@
+/**
+ * Copyright (c) 2009-2011 VMware, Inc. All Rights Reserved.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.cassandra.testtools;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ *
+ * An in-memory Cassandra storage service.
+ * Useful for unit testing
+ *
+ * This implementation is based on the springsource community project,
+ * com.springsource.insight:insight-plugin-cassandra12.
+ *
+ * {@see <a href="https://github.com/spring-projects/spring-insight-plugins/blob/c2986b457b482cd08a77a26297c087df59535067/collection-plugins/cassandra12/src/test/java/com/springsource/insight/plugin/cassandra/embeded/EmbeddedCassandraService.java">
+ *     com.springsource.insight:insight-plugin-cassandra12
+ *     </a>}
+ *
+ * It has been repurposed to a JUnit external resource by:
+ * - Extending ExternalResource instead of implementing Runnable.
+ * - Exposing the host and port to use for native connections.
+ *
+ */
+
+public class EmbeddedCassandraResource extends ExternalResource {
+    CassandraDaemon cassandraDaemon;
+
+    private final String host;
+    private final Integer nativeTransportPort;
+
+    public EmbeddedCassandraResource() {
+        try {
+            prepare();
+            cassandraDaemon = new CassandraDaemon();
+            cassandraDaemon.init(null);
+            host = DatabaseDescriptor.getRpcAddress().getHostName();
+            nativeTransportPort = DatabaseDescriptor.getNativeTransportPort();
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void before() throws Throwable {
+        cassandraDaemon.start();
+    }
+
+    @Override
+    protected void after() {
+
+        // Cassandra daemon calls System.exit() on windows, which kills the test.
+        // Stop services without killing the process instead.
+        if (FBUtilities.isWindows()) {
+            cassandraDaemon.thriftServer.stop();
+            cassandraDaemon.nativeServer.stop();
+        }
+        else {
+            cassandraDaemon.stop();
+        }
+
+        // Register file cleanup after jvm shutdown
+        // Cassandra doesn't actually shut down until jvm shutdown so need to wait for that first.
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            // Sleep before cleaning up files
+            try {
+                Thread.sleep(3000L);
+                cleanupDataDirectories();
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        }));
+
+    }
+
+    /**
+     * Creates all data dir if they don't exist and cleans them
+     *
+     * @throws IOException
+     */
+    public void prepare() throws IOException {
+        // Tell cassandra where the configuration files are. Use the test configuration file.
+        System.setProperty("storage-config", "../../test/resources");
+
+        cleanupDataDirectories();
+        makeDirsIfNotExist();
+    }
+
+    /**
+     * Deletes all data from cassandra data directories, including the commit log.
+     *
+     * @throws IOException in case of permissions error etc.
+     */
+    public void cleanupDataDirectories() throws IOException {
+        for (String s : getDataDirs()) {
+            cleanDir(s);
+        }
+    }
+
+    /**
+     * Creates the data directories, if they didn't exist.
+     *
+     * @throws IOException if directories cannot be created (permissions etc).
+     */
+    public void makeDirsIfNotExist() throws IOException {
+        for (String s : getDataDirs()) {
+            mkdir(s);
+        }
+    }
+
+    /**
+     * Collects all data dirs and returns a set of String paths on the file system.
+     *
+     * @return
+     */
+    private Set<String> getDataDirs() {
+        Set<String> dirs = new HashSet<String>();
+        for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+            dirs.add(s);
+        }
+        dirs.add(DatabaseDescriptor.getCommitLogLocation());
+        dirs.add(DatabaseDescriptor.getSavedCachesLocation());
+        return dirs;
+    }
+
+    /**
+     * Creates a directory
+     *
+     * @param dir
+     * @throws IOException
+     */
+    private void mkdir(String dir) throws IOException {
+        FileUtils.createDirectory(dir);
+    }
+
+    /**
+     * Removes all directory content from file the system
+     *
+     * @param dir
+     * @throws IOException
+     */
+    private void cleanDir(String dir) throws IOException {
+        File dirFile = new File(dir);
+        if (dirFile.exists() && dirFile.isDirectory()) {
+            FileUtils.deleteRecursive(dirFile);
+        }
+    }
+
+    /**
+     * Returns the native port of the server.
+     * @return the port number.
+     */
+    public Integer getNativeTransportPort() {
+        return nativeTransportPort;
+    }
+
+    /**
+     * Returns the host name of the server.
+     * @return the host name (typically 127.0.0.1).
+     */
+    public String getHost() {
+        return host;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java
new file mode 100644
index 0000000..d4067ef
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Truncate;
+import com.datastax.driver.core.schemabuilder.Create;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.cassandra.client.CassandraConf;
+import org.apache.storm.cassandra.testtools.EmbeddedCassandraResource;
+import org.apache.storm.cassandra.trident.state.MapStateFactoryBuilder;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.FilterNull;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.Split;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class MapStateTest {
+
+    @ClassRule
+    public static final EmbeddedCassandraResource cassandra = new EmbeddedCassandraResource();
+
+    private static Logger logger = LoggerFactory.getLogger(MapStateTest.class);
+    private static Cluster cluster;
+    private Session session;
+
+    @Test
+    public void nonTransactionalStateTest() throws Exception {
+        StateFactory factory = MapStateFactoryBuilder.nontransactional(getCassandraConfig())
+                .withTable("words_ks", "words_table")
+                .withKeys("word")
+                .withJSONBinaryState("state")
+                .build();
+
+        wordsTest(factory);
+    }
+
+    @Test
+    public void transactionalStateTest() throws Exception {
+
+        Map config = new HashMap();
+        StateFactory factory = MapStateFactoryBuilder.transactional(getCassandraConfig())
+                .withTable("words_ks", "words_table")
+                .withKeys("word")
+                .withJSONBinaryState("state")
+                .build();
+
+        wordsTest(factory);
+    }
+
+    @Test
+    public void opaqueStateTest() throws Exception {
+
+        Map config = new HashMap();
+        StateFactory factory = MapStateFactoryBuilder.opaque(getCassandraConfig())
+                .withTable("words_ks", "words_table")
+                .withKeys("word")
+                .withJSONBinaryState("state")
+                .build();
+
+        wordsTest(factory);
+    }
+
+    public void wordsTest(StateFactory factory) throws Exception {
+
+        FixedBatchSpout spout = new FixedBatchSpout(
+                new Fields("sentence"), 3,
+                new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"),
+                new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"));
+        spout.setCycle(false);
+
+        TridentTopology topology = new TridentTopology();
+
+        TridentState wordCounts = topology.newStream("spout1", spout)
+                .each(new Fields("sentence"), new Split(), new Fields("word"))
+                .groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Count(), new Fields("state"))
+                .parallelismHint(1);
+
+        LocalDRPC client = new LocalDRPC();
+        topology.newDRPCStream("words", client)
+                .each(new Fields("args"), new Split(), new Fields("word"))
+                .groupBy(new Fields("word"))
+                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("state"))
+                .each(new Fields("state"), new FilterNull())
+                .aggregate(new Fields("state"), new Sum(), new Fields("sum"));
+
+        LocalCluster cluster = new LocalCluster();
+        logger.info("Submitting topology.");
+        cluster.submitTopology("test", new HashMap(), topology.build());
+
+        logger.info("Waiting for something to happen.");
+        int count;
+        do {
+            Thread.sleep(2000);
+            count = session.execute(QueryBuilder.select().all().from("words_ks", "words_table"))
+                    .getAvailableWithoutFetching();
+            logger.info("Found {} records", count);
+        } while (count < 24);
+
+        logger.info("Starting queries.");
+        assertEquals("[[5]]", client.execute("words", "cat dog the man")); // 5
+        assertEquals("[[0]]", client.execute("words", "cat")); // 0
+        assertEquals("[[0]]", client.execute("words", "dog")); // 0
+        assertEquals("[[4]]", client.execute("words", "the")); // 4
+        assertEquals("[[1]]", client.execute("words", "man")); // 1
+
+        cluster.shutdown();
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        Cluster.Builder clusterBuilder = Cluster.builder();
+
+        // Add cassandra cluster contact points
+        clusterBuilder.addContactPoint(cassandra.getHost());
+        clusterBuilder.withPort(cassandra.getNativeTransportPort());
+
+        // Build cluster and connect
+        cluster = clusterBuilder.build();
+        session = cluster.connect();
+
+        createKeyspace("words_ks");
+        createTable("words_ks", "words_table",
+                column("word", DataType.varchar()),
+                column("state", DataType.blob()));
+
+    }
+
+    @After
+    public void tearDown() {
+        truncateTable("words_ks", "words_table");
+        session.close();
+    }
+
+    protected void createKeyspace(String keyspace) throws Exception {
+        // Create keyspace not supported in the current datastax driver
+        String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS "
+                + keyspace
+                + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
+        logger.info(createKeyspace);
+        if (!session.execute(createKeyspace)
+            .wasApplied()) {
+            throw new Exception("Did not create keyspace " + keyspace);
+        }
+    }
+
+    protected Config getCassandraConfig() {
+        Config cassandraConf = new Config();
+        cassandraConf.put(CassandraConf.CASSANDRA_NODES, cassandra.getHost());
+        cassandraConf.put(CassandraConf.CASSANDRA_PORT, cassandra.getNativeTransportPort());
+        cassandraConf.put(CassandraConf.CASSANDRA_KEYSPACE, "words_ks");
+        return cassandraConf;
+    }
+
+    protected void truncateTable(String keyspace, String table) {
+        Truncate truncate = QueryBuilder.truncate(keyspace, table);
+        session.execute(truncate);
+    }
+
+    protected void createTable(String keyspace, String table, Column key, Column... fields) {
+        Map<String, Object> replication = new HashMap<>();
+        replication.put("class", SimpleStrategy.class.getSimpleName());
+        replication.put("replication_factor", 1);
+
+        Create createTable = SchemaBuilder.createTable(keyspace, table)
+                .ifNotExists()
+                .addPartitionKey(key.name, key.type);
+        for (Column field : fields) {
+            createTable.addColumn(field.name, field.type);
+        }
+        logger.info(createTable.toString());
+        session.execute(createTable);
+    }
+
+    protected static Column column(String name, DataType type) {
+        Column column = new Column();
+        column.name = name;
+        column.type = type;
+        return column;
+    }
+
+    protected static class Column {
+        public String name;
+        public DataType type;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/resources/cassandra.yaml b/external/storm-cassandra/src/test/resources/cassandra.yaml
new file mode 100644
index 0000000..81d0e3f
--- /dev/null
+++ b/external/storm-cassandra/src/test/resources/cassandra.yaml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cluster_name: 'Test Cluster'
+initial_token: 0
+data_file_directories:
+    - ./target/cassandra/data
+commitlog_directory: ./target/cassandra/commitlog
+saved_caches_directory: ./target/cassandra/saved_caches
+
+listen_address: 127.0.0.1
+storage_port: 7000
+rpc_address: 127.0.0.1
+rpc_port: 7365
+start_native_transport: true
+native_transport_port: 9042
+
+commitlog_sync: periodic
+commitlog_sync_period_in_ms: 10000
+partitioner: org.apache.cassandra.dht.RandomPartitioner
+endpoint_snitch: SimpleSnitch
+seed_provider:
+    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+      parameters:
+          - seeds: "127.0.0.1"
\ No newline at end of file


[4/4] storm git commit: STORM-1369: CHANGELOG

Posted by ka...@apache.org.
STORM-1369: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08e7b640
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08e7b640
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08e7b640

Branch: refs/heads/master
Commit: 08e7b640d6f2e4d7cffad9f5ef64248e65e52739
Parents: 80d22b8
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 5 09:45:43 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 5 09:45:43 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/08e7b640/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f3afd24..d7d0b0d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 2.0.0
+ * STORM-1369: Add MapState implementation to storm-cassandra
  * STORM-2432: Storm-Kafka-Client Trident Spout Seeks Incorrect Offset With UNCOMMITTED_LATEST Strategy 
  * STORM-2427: Fix event logger enable disable UI buttons
  * STORM-2425: Storm Hive Bolt not closing open transactions


[2/4] storm git commit: STORM-1369: Add MapState implementation to storm-cassandra.

Posted by ka...@apache.org.
STORM-1369: Add MapState implementation to storm-cassandra.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eb6c3089
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eb6c3089
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eb6c3089

Branch: refs/heads/master
Commit: eb6c308925b2a4cd89031a2a6b951976deb6262f
Parents: b7fe860
Author: Magnus Koch <ma...@ef.com>
Authored: Mon Mar 6 11:39:34 2017 +0100
Committer: Magnus Koch <ma...@ef.com>
Committed: Mon Mar 6 11:39:34 2017 +0100

----------------------------------------------------------------------
 external/storm-cassandra/README.md              | 159 +++++++++---
 external/storm-cassandra/pom.xml                |   9 +
 .../storm/cassandra/executor/AsyncExecutor.java | 167 ++++++++++++-
 .../executor/AsyncExecutorProvider.java         |   2 +-
 .../executor/AsyncResultSetHandler.java         |  58 +++++
 .../query/AyncCQLResultSetValuesMapper.java     |  36 +++
 .../trident/state/CassandraBackingMap.java      | 241 +++++++++++++++++++
 .../trident/state/CassandraMapStateFactory.java | 106 ++++++++
 .../trident/state/MapStateFactoryBuilder.java   | 226 +++++++++++++++++
 .../state/NonTransactionalTupleStateMapper.java |  64 +++++
 .../trident/state/OpaqueTupleStateMapper.java   | 127 ++++++++++
 .../trident/state/SerializedStateMapper.java    |  67 ++++++
 .../trident/state/SimpleStateMapper.java        | 104 ++++++++
 .../cassandra/trident/state/SimpleTuple.java    | 213 ++++++++++++++++
 .../cassandra/trident/state/StateMapper.java    |  35 +++
 .../state/TransactionalTupleStateMapper.java    | 105 ++++++++
 .../TridentAyncCQLResultSetValuesMapper.java    | 117 +++++++++
 .../testtools/EmbeddedCassandraResource.java    | 193 +++++++++++++++
 .../storm/cassandra/trident/MapStateTest.java   | 230 ++++++++++++++++++
 .../src/test/resources/cassandra.yaml           |  39 +++
 20 files changed, 2261 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 1edf708..9f543c9 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -3,13 +3,13 @@ Storm Cassandra Integration (CQL).
 
 [Apache Storm](https://storm.apache.org/) is a free and open source distributed realtime computation system.
 
-### Bolt API implementation for Apache Cassandra
+## Bolt API implementation for Apache Cassandra
 
 This library provides core storm bolt on top of Apache Cassandra.
 Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.
 
 
-### Configuration
+## Configuration
 The following properties may be passed to storm configuration.
 
 | **Property name**                            | **Description** | **Default**         |
@@ -25,17 +25,17 @@ The following properties may be passed to storm configuration.
 | **cassandra.reconnectionPolicy.baseDelayMs** | -               | 100 (ms)            |
 | **cassandra.reconnectionPolicy.maxDelayMs**  | -               | 60000 (ms)          |
 
-### CassandraWriterBolt
+## CassandraWriterBolt
 
-####Static import
+###Static import
 ```java
 
 import static org.apache.storm.cassandra.DynamicStatementBuilder.*
 
 ```
 
-#### Insert Query Builder
-##### Insert query including only the specified tuple fields.
+### Insert Query Builder
+#### Insert query including only the specified tuple fields.
 ```java
 
     new CassandraWriterBolt(
@@ -48,7 +48,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query including all tuple fields.
+#### Insert query including all tuple fields.
 ```java
 
     new CassandraWriterBolt(
@@ -59,7 +59,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert multiple queries from one input tuple.
+#### Insert multiple queries from one input tuple.
 ```java
 
     new CassandraWriterBolt(
@@ -70,7 +70,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query using QueryBuilder
+#### Insert query using QueryBuilder
 ```java
 
     new CassandraWriterBolt(
@@ -81,7 +81,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     )
 ```
 
-##### Insert query with static bound query
+#### Insert query with static bound query
 ```java
 
     new CassandraWriterBolt(
@@ -92,7 +92,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query with static bound query using named setters and aliases
+#### Insert query with static bound query using named setters and aliases
 ```java
 
     new CassandraWriterBolt(
@@ -109,7 +109,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
     );
 ```
 
-##### Insert query with bound statement load from storm configuration
+#### Insert query with bound statement load from storm configuration
 ```java
 
     new CassandraWriterBolt(
@@ -117,7 +117,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
             .bind(all());
 ```
 
-##### Insert query with bound statement load from tuple field
+#### Insert query with bound statement load from tuple field
 ```java
 
     new CassandraWriterBolt(
@@ -125,7 +125,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
             .bind(all());
 ```
 
-##### Insert query with batch statement
+#### Insert query with batch statement
 ```java
 
     // Logged
@@ -202,34 +202,127 @@ builder.setBolt("BOLT_WRITER", bolt, 4)
         .customGrouping("spout", new Murmur3StreamGrouping("title"))
 ```
 
-### Trident API support
-storm-cassandra support Trident `state` API for `inserting` data into Cassandra. 
+## Trident State Support
+
+For a state factory which writes output to Cassandra, use ```CassandraStateFactory``` with an ```INSERT INTO``` statement:
+
 ```java
-        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+
+        // Build state
         CQLStatementTupleMapper insertTemperatureValues = boundQuery(
                 "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
-                .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
-        options.withCQLStatementTupleMapper(insertTemperatureValues);
+                .bind(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))
+                .build();
+
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext())
+                .withCQLStatementTupleMapper(insertTemperatureValues);
+
         CassandraStateFactory insertValuesStateFactory =  new CassandraStateFactory(options);
-        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
-        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
-        stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
-        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+        
+        // Use state in existing stream
+        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater());
+
+```
+
+For a state factory which can query Cassandra, use ```CassandraStateFactory``` with a ```SELECT``` statment:
+
+```java
+
+        // Build state
+        CQLStatementTupleMapper selectStationName = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+                .bind(field("weather_station_id").as("id"))
+                .build();
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext())
+                .withCQLStatementTupleMapper(selectStationName)
+                .withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+        CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options);
+        
+        // Append query to existing stream
+        stream.stateQuery(selectWeatherStationStateFactory, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+
+```
+
+## Trident MapState Support
+
+For a MapState with Cassandra IBackingMap, the simplest option is to use a ```MapStateBuilder``` which generates CQL statements automatically. 
+The builder supports opaque, transactional and non-transactional map states.
+
+To store values in Cassandra you need to provide a ```StateMapper``` that maps the value to fields.  
+
+For simple values, the ```SimpleStateMapper``` can be used:
+
+```java
+        StateFactory mapState = MapStateFactoryBuilder.opaque()
+                .withTable("mykeyspace", "year_month_state")
+                .withKeys("year", "month")
+                .withStateMapper(SimpleStateMapper.opqaue("txid", "sum", "prevSum"))
+                .build();
 ```
 
-Below `state` API for `querying` data from Cassandra.
+For complex values you can either custom build a state mapper, or use binary serialization:
+
+```java
+        StateFactory mapState = MapStateFactoryBuilder.opaque()
+                .withTable("mykeyspace", "year_month_state")
+                .withKeys("year", "month")
+                .withJSONBinaryState("state")
+                .build();
+```
+
+The JSONBinary methods use the storm JSON serializers, but you can also provide custom serializers if you want.
+
+For instance, the ```NonTransactionalTupleStateMapper```, ```TransactionalTupleStateMapper``` or ```OpaqueTupleStateMapper```
+classes can be used if the map state uses tuples as values.
+
 ```java
-        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
-        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
-                 .bind(with(field("weather_station_id").as("id")));
-        options.withCQLStatementTupleMapper(insertTemperatureValues);
-        options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
-        CassandraStateFactory selectWeatherStationStateFactory =  new CassandraStateFactory(options);
-        CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
-        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
-        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));         
+        StateFactory mapState = MapStateFactoryBuilder.<ITuple>nontransactional()
+                .withTable("mykeyspace", "year_month_state")
+                .withKeys("year", "month")
+                .withStateMapper(new NonTransactionalTupleStateMapper("latest_value"))
+                .build();
 ```
 
+Alternatively, you can construct a ```CassandraMapStateFactory``` yourself:
+
+```java
+
+        CQLStatementTupleMapper get = simpleQuery("SELECT state FROM words_ks.words_table WHERE word = ?")
+                .with(fields("word"))
+                .build();
+
+        CQLStatementTupleMapper put = simpleQuery("INSERT INTO words_ks.words_table (word, state) VALUES (?, ?)")
+                .with(fields("word", "state"))
+                .build();
+
+        CassandraBackingMap.Options<Integer> mapStateOptions = new CassandraBackingMap.Options<Integer>(new CassandraContext())
+                .withBatching(BatchStatement.Type.UNLOGGED)
+                .withKeys(new Fields("word"))
+                .withNonTransactionalJSONBinaryState("state")
+                .withMultiGetCQLStatementMapper(get)
+                .withMultiPutCQLStatementMapper(put);
+
+        CassandraMapStateFactory factory = CassandraMapStateFactory.nonTransactional(mapStateOptions)
+                .withCache(0);
+
+```
+
+### MapState Parallelism
+
+The backing map implementation submits queries (gets and puts) in parallel to the Cassandra cluster.
+The default number of parallel requests based on the driver configuration, which ends up being 128 with
+default driver configuration. The maximum parallelism applies to the cluster as a whole, and to each 
+state instance (per worker, not executor).
+
+The default calculation is:
+  default = min(max local, max remote) / 2
+  
+which normally means:
+  min(1024, 256) / 2 = 128
+
+This is deliberately conservative to avoid issues in most setups. If this does not provide sufficient 
+throughput you can either explicitly override the max parallelism on the state builder/factory/backingmap, 
+or you can update the driver configuration.
+
 ## License
 
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 0eff9f6..ed7ccf4 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -38,6 +38,7 @@
         <guava.version>16.0.1</guava.version>
         <commons-lang3.version>3.3</commons-lang3.version>
         <cassandra.driver.core.version>3.1.2</cassandra.driver.core.version>
+        <cassandra.version>2.1.7</cassandra.version>
     </properties>
 
     <developers>
@@ -70,6 +71,13 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.cassandra</groupId>
+            <artifactId>cassandra-all</artifactId>
+            <version>${cassandra.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>
@@ -100,5 +108,6 @@
             <version>1.10.19</version>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
index 5366c81..63b81fe 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
@@ -22,17 +22,22 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
-import com.google.common.util.concurrent.*;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.storm.topology.FailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -106,6 +111,7 @@ public class AsyncExecutor<T> implements Serializable {
     public SettableFuture<T> execAsync(final Statement statement, final T inputs) {
         return execAsync(statement, inputs, handler);
     }
+
     /**
      * Asynchronously executes the specified batch statement. Inputs will be passed to
      * the {@link #handler} once query succeed or failed.
@@ -138,6 +144,117 @@ public class AsyncExecutor<T> implements Serializable {
     }
 
     /**
+     * Asynchronously executes the specified select statements. Results will be passed to the {@link AsyncResultSetHandler}
+     * once each query has succeed or failed.
+     */
+    public SettableFuture<List<T>> execAsync(final List<Statement> statements, final List<T> inputs, Semaphore throttle, final AsyncResultSetHandler<T> handler) {
+
+        final SettableFuture<List<T>> settableFuture = SettableFuture.create();
+        if (inputs.size() == 0) {
+            settableFuture.set(new ArrayList<T>());
+            return settableFuture;
+        }
+
+        final AsyncContext<T> asyncContext = new AsyncContext<>(inputs, throttle, settableFuture);
+        for (int i = 0; i < statements.size(); i++) {
+
+            // Acquire a slot
+            if (asyncContext.acquire()) {
+                try {
+                    pending.incrementAndGet();
+                    final T input = inputs.get(i);
+                    final Statement statement = statements.get(i);
+                    ResultSetFuture future = session.executeAsync(statement);
+                    Futures.addCallback(future, new FutureCallback<ResultSet>() {
+                        @Override
+                        public void onSuccess(ResultSet result) {
+                            try {
+                                handler.success(input, result);
+                            } catch (Throwable throwable) {
+                                asyncContext.exception(throwable);
+                            } finally {
+                                pending.decrementAndGet();
+                                asyncContext.release();
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Throwable throwable) {
+                            try {
+                                handler.failure(throwable, input);
+                            } catch (Throwable throwable2) {
+                                asyncContext.exception(throwable2);
+                            }
+                            finally {
+                                asyncContext
+                                        .exception(throwable)
+                                        .release();
+                                pending.decrementAndGet();
+                                LOG.error(String.format("Failed to execute statement '%s' ", statement), throwable);
+                            }
+                        }
+                    }, executorService);
+                } catch (Throwable throwable) {
+                    asyncContext.exception(throwable)
+                            .release();
+                    pending.decrementAndGet();
+                    break;
+                }
+            }
+
+        }
+
+        return settableFuture;
+    }
+
+    private static class AsyncContext<T> {
+        private final List<T> inputs;
+        private final SettableFuture<List<T>> future;
+        private final  AtomicInteger latch;
+        private final  List<Throwable> exceptions;
+        private final  Semaphore throttle;
+
+        public AsyncContext(List<T> inputs, Semaphore throttle, SettableFuture<List<T>> settableFuture) {
+            this.inputs = inputs;
+            this.latch = new AtomicInteger(inputs.size());
+            this.throttle = throttle;
+            this.exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
+            this.future = settableFuture;
+        }
+
+        public boolean acquire() {
+            throttle.acquireUninterruptibly();
+            // Don't start new requests if there is an exception
+            if (exceptions.size() > 0) {
+                latch.decrementAndGet();
+                throttle.release();
+                return false;
+            }
+            return true;
+        }
+
+        public AsyncContext release() {
+            int remaining = latch.decrementAndGet();
+            if (remaining == 0) {
+                if (exceptions.size() == 0) {
+                    future.set(inputs);
+                }
+                else {
+                    future.setException(new MultiFailedException(exceptions));
+                }
+
+            }
+            throttle.release();
+            return this;
+        }
+
+        public AsyncContext exception(Throwable throwable) {
+            this.exceptions.add(throwable);
+            return this;
+        }
+    }
+
+    /**
      * Returns the number of currently executed tasks which are not yet completed.
      */
     public int getPendingTasksSize() {
@@ -150,4 +267,48 @@ public class AsyncExecutor<T> implements Serializable {
             this.executorService.shutdownNow();
         }
     }
+
+    public static class MultiFailedException extends FailedException {
+        private final List<Throwable> exceptions;
+
+        public MultiFailedException(List<Throwable> exceptions) {
+            super(getMessage(exceptions), exceptions.get(0));
+            this.exceptions = exceptions;
+        }
+
+        private static String getMessage(List<Throwable> exceptions) {
+            int top5 = Math.min(exceptions.size(), 5);
+            StringBuilder sb = new StringBuilder();
+            sb.append("First ")
+                    .append(top5)
+                    .append(" exceptions: ")
+                    .append(System.lineSeparator());
+            for (int i = 0; i < top5; i++) {
+                sb.append(exceptions.get(i).getMessage())
+                        .append(System.lineSeparator());
+            }
+            return sb.toString();
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append(getMessage())
+                    .append(System.lineSeparator())
+                    .append("Multiple exceptions encountered: ")
+                    .append(System.lineSeparator());
+
+            for (Throwable exception : exceptions) {
+                sb.append(exception.toString())
+                        .append(System.lineSeparator());
+            }
+
+            return super.toString();
+        }
+
+        public List<Throwable> getExceptions() {
+            return exceptions;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
index 0c684c0..f4b7277 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
@@ -31,7 +31,7 @@ public class AsyncExecutorProvider {
      * Returns a new {@link AsyncExecutor} per storm executor.
      */
     public static <T> AsyncExecutor getLocal(Session session, AsyncResultHandler<T> handler) {
-        AsyncExecutor<T> executor = localAsyncExecutor.get();
+        AsyncExecutor<T> executor = localAsyncExecutor.<T>get();
         if( executor == null ) {
             localAsyncExecutor.set(executor = new AsyncExecutor<>(session, handler));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
new file mode 100644
index 0000000..8ccb400
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+
+import java.io.Serializable;
+
+/**
+ * Default handler for batch asynchronous execution.
+ */
+public interface AsyncResultSetHandler<T> extends Serializable {
+
+    public static final AsyncResultSetHandler NO_OP_HANDLER = new AsyncResultSetHandler() {
+        @Override
+        public void failure(Throwable t, Object inputs) {
+            /** no-operation **/
+        }
+
+        @Override
+        public void success(Object inputs, ResultSet resultSet) {
+            /** no-operation **/
+        }
+
+    };
+
+    /**
+     * This method is responsible for failing specified inputs.
+     *
+     * @param t The cause the failure.
+     * @param inputs The input tuple proceed.
+     */
+    void failure(Throwable t, T inputs);
+
+    /**
+     * This method is responsible for acknowledging specified inputs.
+     *
+     * @param inputs The input tuple proceed.
+     */
+    void success(T inputs, ResultSet resultSet) ;
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
new file mode 100644
index 0000000..9b92b99
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A resultset mapper that
+ */
+public interface AyncCQLResultSetValuesMapper extends Serializable {
+
+    List<List<Values>> map(Session session, List<Statement> statements, List<ITuple> tuples);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
new file mode 100644
index 0000000..82f3b9c
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
+import org.apache.storm.trident.state.JSONOpaqueSerializer;
+import org.apache.storm.trident.state.JSONTransactionalSerializer;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.IBackingMap;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+/**
+ * An IBackingState implementation for Cassandra.
+ *
+ * The implementation stores state as a binary blob in cassandra using a {@link Serializer}.
+ * It supports Opaque, Transactional and NonTransactional states, given a matching serializer.
+ *
+ * Configuration is done with three separate constructs:
+ *  - One tuple mapper for multiGet, which should map keys to a select statement and return {@link Values}.
+ *  - One state mapper, which maps the state to/from a {@link Values} representation, which is used for binding.
+ *  - One tuple mapper for multiPut, which should map {@link Values} to an INSERT or UPDATE statement.
+ *
+ * {@link #multiPut(List, List)} updates Cassandra with parallel statements.
+ * {@link #multiGet(List)} queries Cassandra with parallel statements.
+ *
+ * Parallelism defaults to half the maximum requests per host, either local or remote whichever is
+ * lower. The driver defaults to 256 for remote hosts and 1024 for local hosts, so the default value is 128
+ * unless the driver is configured otherwise.
+ *
+ * @param <T>
+ */
+public class CassandraBackingMap<T> implements IBackingMap<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraBackingMap.class);
+
+    private final Map conf;
+    private final Options<T> options;
+    private final Fields allFields;
+
+    private SimpleClient client;
+    private Session session;
+    private AyncCQLResultSetValuesMapper getResultMapper;
+    private AyncCQLResultSetValuesMapper putResultMapper;
+    private Semaphore throttle;
+
+
+    protected CassandraBackingMap(Map conf, Options<T> options) {
+        this.conf = conf;
+        this.options = options;
+        List<String> allFields = options.keyFields.toList();
+        allFields.addAll(options.stateMapper.getStateFields().toList());
+        this.allFields = new Fields(allFields);
+    }
+
+    public void prepare() {
+        LOG.info("Preparing state for {}", options.toString());
+        Preconditions.checkNotNull(options.getMapper, "CassandraBackingMap.Options should have getMapper");
+        Preconditions.checkNotNull(options.putMapper, "CassandraBackingMap.Options should have putMapper");
+        client = options.clientProvider.getClient(conf);
+        session = client.connect();
+        if (options.maxParallelism == null || options.maxParallelism <= 0) {
+            PoolingOptions po = session.getCluster().getConfiguration().getPoolingOptions();
+            Integer maxRequestsPerHost = Math.min(
+                    po.getMaxConnectionsPerHost(HostDistance.LOCAL) * po.getMaxRequestsPerConnection(HostDistance.LOCAL),
+                    po.getMaxConnectionsPerHost(HostDistance.REMOTE) * po.getMaxRequestsPerConnection(HostDistance.REMOTE)
+            );
+            options.maxParallelism = maxRequestsPerHost / 2;
+            LOG.info("Parallelism default set to {}", options.maxParallelism);
+        }
+        throttle = new Semaphore(options.maxParallelism, false);
+        this.getResultMapper = new TridentAyncCQLResultSetValuesMapper(options.stateMapper.getStateFields(), throttle);
+        this.putResultMapper = new TridentAyncCQLResultSetValuesMapper(null, throttle);
+    }
+
+    @Override
+    public List<T> multiGet(List<List<Object>> keys) {
+        LOG.debug("multiGet fetching {} values.", keys.size());
+        List<Statement> selects = new ArrayList<>();
+        List<ITuple> keyTuples = new ArrayList<>();
+
+        for (int i = 0; i < keys.size(); i++) {
+            SimpleTuple keyTuple = new SimpleTuple(options.keyFields, keys.get(i));
+            List<Statement> mappedStatements = options.getMapper.map(conf, session, keyTuple);
+            if (mappedStatements.size() > 1) {
+                throw new IllegalArgumentException("Only one statement per map state item is supported.");
+            }
+            selects.add(mappedStatements.size() == 1 ? mappedStatements.get(0) : null);
+            keyTuples.add(keyTuple);
+        }
+
+        List<List<Values>> results = getResultMapper
+                .map(session, selects, keyTuples);
+
+        List<T> states = new ArrayList<>();
+        for (List<Values> values : results) {
+            T state = (T) options.stateMapper.fromValues(values);
+            states.add(state);
+        }
+
+        return states;
+
+    }
+
+    @Override
+    public void multiPut(List<List<Object>> keys, List<T> values) {
+        LOG.debug("multiPut writing {} values.", keys.size());
+
+        List<Statement> statements = new ArrayList<>();
+        for (int i = 0; i < keys.size(); i++) {
+            Values stateValues = options.stateMapper.toValues(values.get(i));
+            SimpleTuple tuple = new SimpleTuple(allFields, keys.get(i), stateValues);
+            statements.addAll(options.putMapper.map(conf, session, tuple));
+        }
+
+        try {
+            putResultMapper.map(session, statements, null);
+        } catch (Exception e) {
+            LOG.warn("Write operation failed: {}", e.getMessage());
+            throw new FailedException(e);
+        }
+    }
+
+    public static final class Options<T> implements Serializable {
+        private final SimpleClientProvider clientProvider;
+        private Fields keyFields;
+        private StateMapper stateMapper;
+        private CQLStatementTupleMapper getMapper;
+        private CQLStatementTupleMapper putMapper;
+        private Integer maxParallelism = 128;
+
+        public Options(SimpleClientProvider clientProvider) {
+            this.clientProvider = clientProvider;
+        }
+
+        public Options<T> withKeys(Fields keyFields) {
+            this.keyFields = keyFields;
+            return this;
+        }
+
+        public Options<T> withStateMapper(StateMapper<T> stateMapper) {
+            this.stateMapper = stateMapper;
+            return this;
+        }
+
+        public Options<T> withNonTransactionalJSONBinaryState(String fieldName) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONNonTransactionalSerializer());
+            return this;
+        }
+
+        public Options<T> withNonTransactionalBinaryState(String fieldName, Serializer<T> serializer) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+            return this;
+        }
+
+        public Options<T> withTransactionalJSONBinaryState(String fieldName) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONTransactionalSerializer());
+            return this;
+        }
+
+        public Options<T> withTransactionalBinaryState(String fieldName, Serializer<TransactionalValue<T>> serializer) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+            return this;
+        }
+
+        public Options<T> withOpaqueJSONBinaryState(String fieldName) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONOpaqueSerializer());
+            return this;
+        }
+
+        public Options<T> withOpaqueBinaryState(String fieldName, Serializer<OpaqueValue<T>> serializer) {
+            this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+            return this;
+        }
+
+        public Options<T> withGetMapper(CQLStatementTupleMapper getMapper) {
+            this.getMapper = getMapper;
+            return this;
+        }
+
+        public Options<T> withPutMapper(CQLStatementTupleMapper putMapper) {
+            this.putMapper = putMapper;
+            return this;
+        }
+
+        public Options<T> withMaxParallelism(Integer maxParallelism) {
+            this.maxParallelism = maxParallelism;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s: [keys: %s, StateMapper: %s, getMapper: %s, putMapper: %s, maxParallelism: %d",
+                    this.getClass().getSimpleName(),
+                    keyFields,
+                    stateMapper,
+                    getMapper,
+                    putMapper,
+                    maxParallelism
+            );
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
new file mode 100644
index 0000000..abd9477
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.CachedMap;
+import org.apache.storm.trident.state.map.IBackingMap;
+import org.apache.storm.trident.state.map.MapState;
+import org.apache.storm.trident.state.map.NonTransactionalMap;
+import org.apache.storm.trident.state.map.OpaqueMap;
+import org.apache.storm.trident.state.map.TransactionalMap;
+
+import java.util.Map;
+
+/**
+ * A StateFactory implementation that creates a MapState backed by CassandraBackingMap.
+ *
+ * The statefactory supports opaque, transactional and non-transactional configurations.
+ * Optionally, the backing map can be wrapped in a {@link CachedMap} by specifying {@link #withCache} (off by default).
+ *
+ */
+public class CassandraMapStateFactory implements StateFactory {
+
+    private final StateType stateType;
+    private final CassandraBackingMap.Options options;
+    private int cacheSize;
+    private Map cassandraConfig;
+
+    private CassandraMapStateFactory(StateType stateType, CassandraBackingMap.Options options, Map cassandraConfig) {
+        this.stateType = stateType;
+        this.options = options;
+        this.cassandraConfig = cassandraConfig;
+    }
+
+    public static CassandraMapStateFactory opaque(CassandraBackingMap.Options options, Map cassandraConfig) {
+        return new CassandraMapStateFactory(StateType.OPAQUE, options, cassandraConfig);
+    }
+
+    public static CassandraMapStateFactory transactional(CassandraBackingMap.Options options, Map cassandraConfig) {
+        return new CassandraMapStateFactory(StateType.TRANSACTIONAL, options, cassandraConfig);
+    }
+
+    public static CassandraMapStateFactory nonTransactional(CassandraBackingMap.Options options, Map cassandraConfig) {
+        return new CassandraMapStateFactory(StateType.NON_TRANSACTIONAL, options, cassandraConfig);
+    }
+
+    public CassandraMapStateFactory withCache(int cacheSize) {
+        this.cacheSize = cacheSize;
+        return this;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+
+        CassandraBackingMap cassandraBackingMap = new CassandraBackingMap(cassandraConfig, options);
+        cassandraBackingMap.prepare();
+
+        IBackingMap backingMap = cacheSize > 0
+                ? new CachedMap<>(cassandraBackingMap, cacheSize)
+                : cassandraBackingMap;
+
+        MapState<?> mapState;
+
+        switch (stateType) {
+            case OPAQUE:
+                mapState = OpaqueMap.build((IBackingMap<OpaqueValue>) backingMap);
+                break;
+
+            case TRANSACTIONAL:
+                mapState = TransactionalMap.build((IBackingMap<TransactionalValue>)backingMap);
+                break;
+
+            case NON_TRANSACTIONAL:
+                mapState = NonTransactionalMap.build(backingMap);
+                break;
+
+            default:
+                throw new IllegalArgumentException("Invalid state provided " + stateType);
+        }
+
+        return mapState;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
new file mode 100644
index 0000000..c371fdb
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.Select;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
+import org.apache.storm.trident.state.JSONOpaqueSerializer;
+import org.apache.storm.trident.state.JSONTransactionalSerializer;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+
+/**
+ * A helper for building a MapState backed by Cassandra. It internalizes some common
+ * implementation choices to simplify usage.
+ *
+ * In the simplest use case, a map state can be constructed with:
+ *
+ * StateFactory mapState = MapStateFactoryBuilder.opaque()
+ *     .withTable("mykeyspace", "year_month_state")
+ *     .withKeys("year", "month")
+ *     .withJSONBinaryState("state")
+ *     .build();
+ *
+ * for a cassandra table with:
+ * mykeyspace.year_month_state {
+ *     year: int,
+ *     month: int,
+ *     state: blob
+ * }
+ *
+ * This will use the storm JSON serializers to convert the state to and from binary format.
+ * Other binary serializers can be used with the {@link #withBinaryState(String, Serializer)} method.
+ *
+ * Storing state in explicit fields (e.g. in a field "sum" of type int) is possible by instead calling
+ * {@link #withStateMapper(StateMapper)}. For instance, you can use {@link NonTransactionalTupleStateMapper},
+ * {@link TransactionalTupleStateMapper} or {@link OpaqueTupleStateMapper} if your state values are tuples.
+ *
+ */
+public class MapStateFactoryBuilder<T> {
+
+    private static final Logger logger = LoggerFactory.getLogger(MapStateFactoryBuilder.class);
+
+    private String keyspace;
+    private String table;
+    private String[] keys;
+    private Integer maxParallelism;
+    private StateType stateType;
+    private StateMapper<T> stateMapper;
+    private Map cassandraConfig;
+    private int cacheSize;
+
+    public static <U> MapStateFactoryBuilder<OpaqueValue<U>> opaque(Map cassandraConf) {
+        return new MapStateFactoryBuilder<OpaqueValue<U>>()
+                .withStateType(StateType.OPAQUE)
+                .withCassandraConfig(cassandraConf);
+    }
+
+    public static <U> MapStateFactoryBuilder<TransactionalValue<U>> transactional(Map cassandraConf) {
+        return new MapStateFactoryBuilder<TransactionalValue<U>>()
+                .withStateType(StateType.TRANSACTIONAL)
+                .withCassandraConfig(cassandraConf);
+    }
+
+    public static <U> MapStateFactoryBuilder<U> nontransactional(Map cassandraConf) {
+        return new MapStateFactoryBuilder<U>()
+                .withStateType(StateType.NON_TRANSACTIONAL)
+                .withCassandraConfig(cassandraConf);
+    }
+
+    public MapStateFactoryBuilder<T> withTable(String keyspace, String table) {
+        this.keyspace = keyspace;
+        this.table = table;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withKeys(String... keys) {
+        this.keys = keys;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withMaxParallelism(Integer maxParallelism) {
+        this.maxParallelism = maxParallelism;
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public MapStateFactoryBuilder<T> withJSONBinaryState(String stateField) {
+        switch (stateType) {
+            case OPAQUE:
+                return withBinaryState(stateField, (Serializer) new JSONOpaqueSerializer());
+            case TRANSACTIONAL:
+                return withBinaryState(stateField, (Serializer) new JSONTransactionalSerializer());
+            case NON_TRANSACTIONAL:
+                return withBinaryState(stateField, new JSONNonTransactionalSerializer());
+            default:
+                throw new IllegalArgumentException("State type " + stateType + " is unknown.");
+        }
+    }
+
+    public MapStateFactoryBuilder<T> withStateMapper(StateMapper<T> stateMapper) {
+        this.stateMapper = stateMapper;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withBinaryState(String stateField, Serializer<T> serializer) {
+        return withStateMapper(new SerializedStateMapper<>(stateField, serializer));
+    }
+
+    protected MapStateFactoryBuilder<T> withStateType(StateType stateType) {
+        this.stateType = stateType;
+        return this;
+    }
+
+    protected MapStateFactoryBuilder<T> withCassandraConfig(Map cassandraConf) {
+        this.cassandraConfig = cassandraConf;
+        return this;
+    }
+
+    public MapStateFactoryBuilder<T> withCache(int cacheSize) {
+        this.cacheSize = cacheSize;
+        return this;
+    }
+
+    public StateFactory build() {
+
+        Objects.requireNonNull(keyspace, "A keyspace is required.");
+        Objects.requireNonNull(table, "A table name is required.");
+        Objects.requireNonNull(keys, "At least one key must be specified.");
+        if (keys.length == 0) {
+            throw new IllegalArgumentException("At least one key must be specified.");
+        }
+        Objects.requireNonNull(stateMapper, "A state mapper must be specified.");
+        Objects.requireNonNull(stateType, "A state type must be specified.");
+
+        List<String> stateFields = stateMapper.getStateFields()
+                .toList();
+
+        String[] stateFieldsArray = stateFields.toArray(new String[stateFields.size()]);
+
+        List<String> allFields = new ArrayList<>();
+        Collections.addAll(allFields, keys);
+        allFields.addAll(stateFields);
+
+        // Build get query
+        Select.Where getQuery = select(stateFieldsArray)
+                .from(keyspace, table)
+                .where();
+
+        for (String key : keys) {
+            getQuery.and(eq(key, bindMarker()));
+        }
+
+        CQLStatementTupleMapper get = boundQuery(getQuery.toString())
+                .bind(all())
+                .build();
+
+        // Build put query
+        Insert putStatement = insertInto(keyspace, table)
+                .values(allFields, Collections.<Object>nCopies(allFields.size(), bindMarker()));
+
+        CQLStatementTupleMapper put = boundQuery(putStatement.toString())
+                .bind(all())
+                .build();
+
+        CassandraBackingMap.Options options = new CassandraBackingMap.Options<T>(new CassandraContext())
+                .withGetMapper(get)
+                .withPutMapper(put)
+                .withStateMapper(stateMapper)
+                .withKeys(new Fields(keys))
+                .withMaxParallelism(maxParallelism);
+
+        logger.debug("Building factory with: \n  get: {}\n  put: {}\n  mapper: {}",
+                getQuery.toString(),
+                putStatement.toString(),
+                stateMapper.toString());
+
+        switch (stateType) {
+            case NON_TRANSACTIONAL:
+                return CassandraMapStateFactory.nonTransactional(options, cassandraConfig)
+                        .withCache(cacheSize);
+            case TRANSACTIONAL:
+                return CassandraMapStateFactory.transactional(options, cassandraConfig)
+                        .withCache(cacheSize);
+            case OPAQUE:
+                return CassandraMapStateFactory.opaque(options, cassandraConfig)
+                        .withCache(cacheSize);
+        }
+
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
new file mode 100644
index 0000000..3a36b07
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+/**
+ * State mapper that maps a tuple to separate state fields.
+ */
+public class NonTransactionalTupleStateMapper implements StateMapper<ITuple> {
+
+    private final Fields fields;
+
+    public NonTransactionalTupleStateMapper(String... fields) {
+        this.fields = new Fields(fields);
+    }
+
+    public NonTransactionalTupleStateMapper(Fields fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return fields;
+    }
+
+    @Override
+    public Values toValues(ITuple t) {
+        return new Values(t.getValues());
+    }
+
+    @Override
+    public ITuple fromValues(List<Values> values) {
+        if (values == null || values.size() == 0) {
+            return null;
+        }
+        return new SimpleTuple(fields, values.get(0));
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), fields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
new file mode 100644
index 0000000..882c9b1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * State mapper that maps an opaque tuple to separate state fields.
+ */
+public class OpaqueTupleStateMapper implements StateMapper<OpaqueValue<ITuple>> {
+
+    private final Fields tupleFields;
+    private final Fields tableFields;
+
+    public OpaqueTupleStateMapper(String currTxIdField, String currPrefix, String prevPrefix, String... fields) {
+        this(currTxIdField, currPrefix, prevPrefix, new Fields(fields));
+    }
+
+    public OpaqueTupleStateMapper(String currTxIdField, String currPrefix, String prevPrefix, Fields fields) {
+        tupleFields = fields;
+        ArrayList<String> fieldList = new ArrayList<>();
+        fieldList.add(currTxIdField);
+        for (String field : fields) {
+            fieldList.add(currPrefix + field);
+        }
+        for (String field : fields) {
+            fieldList.add(prevPrefix + field);
+        }
+        tableFields = new Fields(fieldList);
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return tableFields;
+    }
+
+    @Override
+    public Values toValues(OpaqueValue<ITuple> tuple) {
+        Values values = new Values();
+        values.add(tuple.getCurrTxid());
+
+        for (String valueField : tupleFields) {
+            if (tuple.getCurr() != null) {
+                values.add(tuple.getCurr().getValueByField(valueField));
+            }
+            else {
+                values.add(null);
+            }
+        }
+
+        for (String valueField : tupleFields) {
+            if (tuple.getPrev() != null) {
+                values.add(tuple.getPrev().getValueByField(valueField));
+            }
+            else {
+                values.add(null);
+            }
+        }
+
+        return values;
+    }
+
+    @Override
+    public OpaqueValue<ITuple> fromValues(List<Values> valuesList) {
+        if (valuesList == null || valuesList.size() == 0) {
+            return null;
+        }
+        Values values = valuesList.get(0);
+        int index = 0;
+        Long currTx = (Long) values.get(index++);
+
+        SimpleTuple curr = new SimpleTuple(tupleFields);
+        for (String valueField : tupleFields) {
+            curr.put(valueField, values.get(index++));
+        }
+
+        if (isAllNull(curr)) {
+            curr = null;
+        }
+
+        SimpleTuple prev = new SimpleTuple(tupleFields);
+        for (String valueField : tupleFields) {
+            prev.put(valueField, values.get(index++));
+        }
+        if (isAllNull(prev)) {
+            prev = null;
+        }
+
+        return new OpaqueValue<ITuple>(currTx, curr, prev);
+    }
+
+    private boolean isAllNull(SimpleTuple tuple) {
+        for (Object value : tuple.getValues()) {
+            if (value != null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), tableFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
new file mode 100644
index 0000000..b4ec6c8
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SerializedStateMapper<T> implements StateMapper<T> {
+
+    private final Fields stateFields;
+    private final Serializer<T> serializer;
+
+    public SerializedStateMapper(String fieldName, Serializer<T> serializer) {
+        this.stateFields = new Fields(fieldName);
+        this.serializer = serializer;
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return stateFields;
+    }
+
+    @Override
+    public Values toValues(T value) {
+        byte[] serialized = serializer.serialize(value);
+        return new Values(ByteBuffer.wrap(serialized));
+    }
+
+    @Override
+    public T fromValues(List<Values> values) {
+        if (values.size() == 0) {
+            return null;
+        }
+        else if (values.size() == 1) {
+            ByteBuffer bytes = (ByteBuffer) values.get(0).get(0);
+            return serializer.deserialize(bytes.array());
+        }
+        else {
+            throw new IllegalArgumentException("Can only convert single values, " + values.size() + " encountered");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s, serializer: %s}", this.getClass().getSimpleName(), stateFields, serializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
new file mode 100644
index 0000000..cc03a09
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class SimpleStateMapper<T> implements StateMapper<T> {
+
+    private final Fields fields;
+    private final StateType stateType;
+
+    public SimpleStateMapper(Fields fields, StateType stateType) {
+        this.fields = fields;
+        this.stateType = stateType;
+    }
+
+    public static <U> StateMapper<OpaqueValue<U>> opaque(String txIdField, String previousField, String field) {
+        return new SimpleStateMapper<>(new Fields(txIdField, field, previousField), StateType.OPAQUE);
+    }
+
+    public static <U> StateMapper<TransactionalValue<U>> opaque(String txIdField, String field) {
+        return new SimpleStateMapper<>(new Fields(txIdField, field), StateType.TRANSACTIONAL);
+    }
+
+    public static <U> StateMapper<U> nontransactional(String field) {
+        return new SimpleStateMapper<>(new Fields(field), StateType.NON_TRANSACTIONAL);
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return fields;
+    }
+
+    @Override
+    public Values toValues(T value) {
+        if (value == null) {
+            return null;
+        }
+        switch (stateType) {
+            case NON_TRANSACTIONAL:
+                return new Values(value);
+            case TRANSACTIONAL:
+                TransactionalValue transactional = (TransactionalValue) value;
+                return new Values(transactional.getTxid(), transactional.getVal());
+            case OPAQUE:
+                OpaqueValue opaque = (OpaqueValue) value;
+                return new Values(opaque.getCurrTxid(), opaque.getCurr(), opaque.getPrev());
+            default:
+                throw new IllegalStateException("Unknown state type " + stateType);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public T fromValues(List<Values> valuesSet) {
+        if (valuesSet == null || valuesSet.size() == 0) {
+            return null;
+        }
+        else if (valuesSet.size() == 1) {
+            Values values = valuesSet.get(0);
+            if (values == null) {
+                return null;
+            }
+            switch (stateType) {
+                case NON_TRANSACTIONAL:
+                    return (T) values.get(0);
+                case TRANSACTIONAL:
+                    return (T) new TransactionalValue((Long) values.get(0), values.get(1));
+                case OPAQUE:
+                    return (T) new OpaqueValue((Long) values.get(0), values.get(1), values.get(2));
+                default:
+                    throw new IllegalStateException("Unknown state type " + stateType);
+            }
+        }
+        throw new IllegalStateException("State query returned multiple results.");
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s, stateType: %s}", this.getClass().getSimpleName(), fields, stateType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
new file mode 100644
index 0000000..d78f6d7
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class for passing around ordered key/value data with an immutable key set.
+ */
+public class SimpleTuple implements ITuple, Serializable {
+
+    private static final long serialVersionUID = -4656331293513898312L;
+
+    private final List<String> keys;
+    private List<Object> values;
+
+    public SimpleTuple(Fields keyFields, List<Object> values) {
+        this.keys = keyFields.toList();
+        this.values = new ArrayList<>();
+        this.values.addAll(values);
+        while (this.values.size() < keys.size()) {
+            this.values.add(null);
+        }
+    }
+
+    public SimpleTuple(Fields keyFields, List<Object>... values) {
+        this.keys = keyFields.toList();
+        this.values = new ArrayList<>();
+        for (List<Object> valueList : values) {
+            this.values.addAll(valueList);
+        }
+        while (this.values.size() < keys.size()) {
+            this.values.add(null);
+        }
+    }
+
+    public SimpleTuple put(String key, Object value) {
+        int index = keys.indexOf(key);
+        if (index >= 0) {
+            values.set(index, value);
+        }
+        else {
+            throw new IllegalArgumentException("Field " + key + " does not exist.");
+        }
+        return this;
+    }
+
+    public SimpleTuple setValues(List<Object> values) {
+        this.values = new ArrayList<>(values);
+        return this;
+    }
+
+    @Override
+    public int size() {
+        return keys.size();
+    }
+
+    @Override
+    public boolean contains(String field) {
+        return keys.contains(field);
+    }
+
+    @Override
+    public Fields getFields() {
+        return new Fields(keys);
+    }
+
+    @Override
+    public int fieldIndex(String field) {
+        return keys.indexOf(field);
+    }
+
+    @Override
+    public List<Object> select(Fields selector) {
+        List<Object> values = new ArrayList<>();
+        for (String field : selector) {
+            values.add(getValueByField(field));
+        }
+        return values;
+    }
+
+    @Override
+    public Object getValue(int i) {
+        return values.get(i);
+    }
+
+    @Override
+    public String getString(int i) {
+        return (String) values.get(i);
+    }
+
+    @Override
+    public Integer getInteger(int i) {
+        return (Integer) values.get(i);
+    }
+
+    @Override
+    public Long getLong(int i) {
+        return (Long) values.get(i);
+    }
+
+    @Override
+    public Boolean getBoolean(int i) {
+        return (Boolean) values.get(i);
+    }
+
+    @Override
+    public Short getShort(int i) {
+        return (Short) values.get(i);
+    }
+
+    @Override
+    public Byte getByte(int i) {
+        return (Byte) values.get(i);
+    }
+
+    @Override
+    public Double getDouble(int i) {
+        return (Double) values.get(i);
+    }
+
+    @Override
+    public Float getFloat(int i) {
+        return (Float) values.get(i);
+    }
+
+    @Override
+    public byte[] getBinary(int i) {
+        return (byte[]) values.get(i);
+    }
+
+    @Override
+    public Object getValueByField(String field) {
+        return values.get(keys.indexOf(field));
+    }
+
+    @Override
+    public String getStringByField(String field) {
+        return (String) getValueByField(field);
+    }
+
+    @Override
+    public Integer getIntegerByField(String field) {
+        return (Integer) getValueByField(field);
+    }
+
+    @Override
+    public Long getLongByField(String field) {
+        return (Long) getValueByField(field);
+    }
+
+    @Override
+    public Boolean getBooleanByField(String field) {
+        return (Boolean) getValueByField(field);
+    }
+
+    @Override
+    public Short getShortByField(String field) {
+        return (Short) getValueByField(field);
+    }
+
+    @Override
+    public Byte getByteByField(String field) {
+        return (Byte) getValueByField(field);
+    }
+
+    @Override
+    public Double getDoubleByField(String field) {
+        return (Double) getValueByField(field);
+    }
+
+    @Override
+    public Float getFloatByField(String field) {
+        return (Float) getValueByField(field);
+    }
+
+    @Override
+    public byte[] getBinaryByField(String field) {
+        return (byte[]) getValueByField(field);
+    }
+
+    @Override
+    public List<Object> getValues() {
+        return Collections.unmodifiableList(values);
+    }
+
+    public List<String> getKeys() {
+        return Collections.unmodifiableList(keys);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
new file mode 100644
index 0000000..ef0c783
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface StateMapper<T> extends Serializable {
+
+    Fields getStateFields();
+
+    Values toValues(T value);
+
+    T fromValues(List<Values> values);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
new file mode 100644
index 0000000..83332b9
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * State mapper that maps a transactional tuple to separate state fields.
+ */
+public class TransactionalTupleStateMapper implements StateMapper<TransactionalValue<ITuple>> {
+
+    private final Fields tupleFields;
+    private final Fields tableFields;
+
+    public TransactionalTupleStateMapper(String txIdField, String... fields) {
+        this(txIdField, new Fields(fields));
+    }
+
+    public TransactionalTupleStateMapper(String txIdField, Fields fields) {
+        tupleFields = fields;
+        ArrayList<String> fieldList = new ArrayList<>();
+        fieldList.add(txIdField);
+        for (String field : fields) {
+            fieldList.add(field);
+        }
+        tableFields = new Fields(fieldList);
+    }
+
+    @Override
+    public Fields getStateFields() {
+        return tableFields;
+    }
+
+    @Override
+    public Values toValues(TransactionalValue<ITuple> tuple) {
+        Values values = new Values();
+        values.add(tuple.getTxid());
+
+        for (String valueField : tupleFields) {
+            if (tuple.getVal() != null) {
+                values.add(tuple.getVal().getValueByField(valueField));
+            }
+            else {
+                values.add(null);
+            }
+        }
+
+        return values;
+    }
+
+    @Override
+    public TransactionalValue<ITuple> fromValues(List<Values> valuesList) {
+        if (valuesList == null || valuesList.size() == 0) {
+            return null;
+        }
+        Values values = valuesList.get(0);
+        int index = 0;
+        Long txId = (Long) values.get(index++);
+
+        SimpleTuple curr = new SimpleTuple(tupleFields);
+        for (String valueField : tupleFields) {
+            curr.put(valueField, values.get(index++));
+        }
+
+        boolean isAllNull = true;
+        for (Object value : curr.getValues()) {
+            if (value != null) {
+                isAllNull = false;
+                break;
+            }
+        }
+        if (isAllNull) {
+            curr = null;
+        }
+
+        return new TransactionalValue<ITuple>(txId, curr);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), tableFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
new file mode 100644
index 0000000..0d03fed
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.storm.cassandra.executor.AsyncExecutor;
+import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.AsyncResultSetHandler;
+import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A result set mapper implementation which runs requests in parallel and waits for them all to finish.
+ */
+public class TridentAyncCQLResultSetValuesMapper implements AyncCQLResultSetValuesMapper {
+    private final Fields outputDeclaredFields;
+    private final Semaphore throttle;
+
+    public TridentAyncCQLResultSetValuesMapper(Fields outputDeclaredFields, Semaphore throttle) {
+        this.outputDeclaredFields = outputDeclaredFields;
+        this.throttle = throttle;
+    }
+
+    @Override
+    public List<List<Values>> map(Session session, List<Statement> statements, final List<ITuple> tuples) {
+        AsyncExecutor<Integer> executor = AsyncExecutorProvider.getLocal(session, AsyncResultHandler.NO_OP_HANDLER);
+        final List<Integer> indexes = new ArrayList<>();
+        final List<List<Values>> results = new ArrayList<>();
+        for (int i = 0; i < statements.size(); i++) {
+            indexes.add(i);
+            results.add(null);
+        }
+        SettableFuture<List<Integer>> result = executor.execAsync(statements, indexes, throttle, new AsyncResultSetHandler<Integer>() {
+            @Override
+            public void success(Integer index, ResultSet resultSet) {
+                if (outputDeclaredFields != null) {
+                    List<Values> thisResult = new ArrayList<>();
+                    for (Row row : resultSet) {
+                        final Values values = new Values();
+                        for (String field : outputDeclaredFields) {
+                            ITuple tuple = tuples.get(index);
+                            if (tuple.contains(field)) {
+                                values.add(tuple.getValueByField(field));
+                            } else {
+                                values.add(row.getObject(field));
+                            }
+                        }
+                        thisResult.add(values);
+                    }
+                    results.set(index, thisResult);
+                }
+            }
+
+            @Override
+            public void failure(Throwable t, Integer index) {
+                // Exceptions are captured and thrown at the end of the batch by the executor
+            }
+
+        });
+
+        try {
+            // Await all results
+            result.get();
+        } catch (Exception e) {
+            throw new FailedException(e.getMessage(), e);
+        }
+
+        return results;
+    }
+
+    protected List<Values> handleResult(ResultSet resultSet, ITuple tuple) {
+        List<Values> list = new ArrayList<>();
+        for (Row row : resultSet) {
+            final Values values = new Values();
+            for (String field : outputDeclaredFields) {
+                if (tuple.contains(field)) {
+                    values.add(tuple.getValueByField(field));
+                } else {
+                    values.add(row.getObject(field));
+                }
+            }
+            list.add(values);
+        }
+        return list;
+    }
+
+
+
+}


[3/4] storm git commit: Merge branch 'master' of https://github.com/ef-labs/storm into STORM-1369-merge

Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/ef-labs/storm into STORM-1369-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/80d22b84
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/80d22b84
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/80d22b84

Branch: refs/heads/master
Commit: 80d22b84827c67fb4596b9b0eaf0b88c6693258a
Parents: b346c1c eb6c308
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 5 09:45:03 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 5 09:45:03 2017 +0900

----------------------------------------------------------------------
 external/storm-cassandra/README.md              | 159 +++++++++---
 external/storm-cassandra/pom.xml                |   9 +
 .../storm/cassandra/executor/AsyncExecutor.java | 167 ++++++++++++-
 .../executor/AsyncExecutorProvider.java         |   2 +-
 .../executor/AsyncResultSetHandler.java         |  58 +++++
 .../query/AyncCQLResultSetValuesMapper.java     |  36 +++
 .../trident/state/CassandraBackingMap.java      | 241 +++++++++++++++++++
 .../trident/state/CassandraMapStateFactory.java | 106 ++++++++
 .../trident/state/MapStateFactoryBuilder.java   | 226 +++++++++++++++++
 .../state/NonTransactionalTupleStateMapper.java |  64 +++++
 .../trident/state/OpaqueTupleStateMapper.java   | 127 ++++++++++
 .../trident/state/SerializedStateMapper.java    |  67 ++++++
 .../trident/state/SimpleStateMapper.java        | 104 ++++++++
 .../cassandra/trident/state/SimpleTuple.java    | 213 ++++++++++++++++
 .../cassandra/trident/state/StateMapper.java    |  35 +++
 .../state/TransactionalTupleStateMapper.java    | 105 ++++++++
 .../TridentAyncCQLResultSetValuesMapper.java    | 117 +++++++++
 .../testtools/EmbeddedCassandraResource.java    | 193 +++++++++++++++
 .../storm/cassandra/trident/MapStateTest.java   | 230 ++++++++++++++++++
 .../src/test/resources/cassandra.yaml           |  39 +++
 20 files changed, 2261 insertions(+), 37 deletions(-)
----------------------------------------------------------------------