You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/11 07:41:21 UTC

[2/6] storm git commit: STORM-1211 Added trident support for Cassandra connector.

STORM-1211 Added trident support for Cassandra connector.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef9479d0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef9479d0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef9479d0

Branch: refs/heads/master
Commit: ef9479d06add8f70517d410d7e12a6c21940bd75
Parents: 10b33a6
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Thu Dec 3 09:43:05 2015 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Dec 8 11:40:23 2015 +0530

----------------------------------------------------------------------
 .../cassandra/client/impl/DefaultClient.java    |  22 +--
 .../query/CQLResultSetValuesMapper.java         |  36 +++++
 .../query/impl/BoundStatementMapperBuilder.java |   8 +-
 .../cassandra/trident/state/CassandraQuery.java |  44 ++++++
 .../cassandra/trident/state/CassandraState.java | 149 +++++++++++++++++++
 .../trident/state/CassandraStateFactory.java    |  51 +++++++
 .../trident/state/CassandraStateUpdater.java    |  36 +++++
 .../state/TridentResultSetValuesMapper.java     |  63 ++++++++
 .../cassandra/DynamicStatementBuilderTest.java  |  46 +++---
 .../apache/storm/cassandra/WeatherSpout.java    |   4 +-
 .../bolt/BatchCassandraWriterBoltTest.java      |  13 +-
 .../cassandra/bolt/CassandraWriterBoltTest.java |  13 +-
 .../cassandra/trident/TridentTopologyTest.java  | 125 ++++++++++++++++
 .../cassandra/trident/WeatherBatchSpout.java    | 104 +++++++++++++
 .../src/test/resources/schema.cql               |  12 +-
 15 files changed, 679 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
index 945d0a8..6eb8e16 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
@@ -36,25 +36,25 @@ import java.util.Set;
  * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance.
  */
 public class DefaultClient implements SimpleClient, Closeable, Serializable {
-    
+
     private final static Logger LOG = LoggerFactory.getLogger(DefaultClient.class);
-    
+
     private String keyspace;
 
     private Cluster cluster;
-    
+
     private Session session;
 
     /**
      * Create a new {@link DefaultClient} instance.
-     * 
+     *
      * @param cluster a cassandra cluster client.
      */
     public DefaultClient(Cluster cluster, String keyspace) {
         Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
         this.cluster = cluster;
         this.keyspace = keyspace;
-        
+
     }
 
     public Set<Host> getAllHosts() {
@@ -71,14 +71,15 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
         Thread thread = Thread.currentThread();
         return thread.getName();
     }
+
     /**
      * {@inheritDoc}
      */
     @Override
     public synchronized Session connect() throws NoHostAvailableException {
-        if( isDisconnected() ) {
+        if (isDisconnected()) {
             LOG.info("Connected to cluster: {}", cluster.getClusterName());
-            for ( Host host : getAllHosts())
+            for (Host host : getAllHosts())
                 LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack());
 
             LOG.info("Connect to cluster using keyspace %s", keyspace);
@@ -87,7 +88,7 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
             LOG.warn("{} - Already connected to cluster: {}", getExecutorName(), cluster.getClusterName());
         }
 
-        if( session.isClosed() ) {
+        if (session.isClosed()) {
             LOG.warn("Session has been closed - create new one!");
             this.session = cluster.newSession();
         }
@@ -105,13 +106,14 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
      * {@inheritDoc}
      */
     @Override
-    public void close( ) {
-        if( cluster != null && !cluster.isClosed() ) {
+    public void close() {
+        if (cluster != null && !cluster.isClosed()) {
             LOG.info("Try to close connection to cluster: {}", cluster.getClusterName());
             session.close();
             cluster.close();
         }
     }
+
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
new file mode 100644
index 0000000..80b1173
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public interface CQLResultSetValuesMapper extends Serializable {
+
+    List<List<Values>> map(Session session, Statement statement, ITuple tuple);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
index 4348253..b7ded14 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.storm.cassandra.query.ContextQuery.*;
+import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery;
 
 
 public class BoundStatementMapperBuilder implements Serializable {
@@ -64,6 +64,7 @@ public class BoundStatementMapperBuilder implements Serializable {
 
         private final CQLValuesTupleMapper mapper;
 
+        // should be a lru-map or weakhashmap else this may lead to memory leaks.
         private Map<String, PreparedStatement> cache = new HashMap<>();
 
         /**
@@ -86,7 +87,8 @@ public class BoundStatementMapperBuilder implements Serializable {
             final String query = contextQuery.resolves(config, tuple);
             Object[] objects = values.values().toArray(new Object[values.size()]);
             PreparedStatement statement = getPreparedStatement(session, query);
-            return Arrays.asList((Statement)statement.bind(objects));
+            // todo bind objects in the same sequence as the statement expects.
+            return Arrays.asList((Statement) statement.bind(objects));
         }
 
         /**
@@ -97,7 +99,7 @@ public class BoundStatementMapperBuilder implements Serializable {
          */
         private PreparedStatement getPreparedStatement(Session session, String query) {
             PreparedStatement statement = cache.get(query);
-            if( statement == null) {
+            if (statement == null) {
                 statement = session.prepare(query);
                 cache.put(query, statement);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
new file mode 100644
index 0000000..085cbca
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.tuple.Values;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class CassandraQuery extends BaseQueryFunction<CassandraState, List<Values>> {
+
+    @Override
+    public List<List<Values>> batchRetrieve(CassandraState state, List<TridentTuple> tridentTuples) {
+        return state.batchRetrieve(tridentTuples);
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, List<Values> valuesList, TridentCollector collector) {
+        for (Values values : valuesList) {
+            collector.emit(values);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
new file mode 100644
index 0000000..05618ee
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraState.class);
+
+    private final Map conf;
+    private final Options options;
+
+    private Session session;
+    private SimpleClient client;
+
+    public CassandraState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions, Options options) {
+        this.conf = conf;
+        this.options = options;
+    }
+
+    public static final class Options implements Serializable {
+        private final SimpleClientProvider clientProvider;
+        private CQLStatementTupleMapper cqlStatementTupleMapper;
+        private CQLResultSetValuesMapper cqlResultSetValuesMapper;
+        private BatchStatement.Type batchingType = BatchStatement.Type.LOGGED;
+
+
+        public Options(SimpleClientProvider clientProvider) {
+            this.clientProvider = clientProvider;
+        }
+
+        public Options withCQLStatementTupleMapper(CQLStatementTupleMapper cqlStatementTupleMapper) {
+            this.cqlStatementTupleMapper = cqlStatementTupleMapper;
+            return this;
+        }
+
+        public Options withCQLResultSetValuesMapper(CQLResultSetValuesMapper cqlResultSetValuesMapper) {
+            this.cqlResultSetValuesMapper = cqlResultSetValuesMapper;
+            return this;
+        }
+
+        public Options withBatching(BatchStatement.Type batchingType) {
+            this.batchingType = batchingType;
+            return this;
+        }
+
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is noop");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is noop");
+    }
+
+    public void prepare() {
+        client = options.clientProvider.getClient(conf);
+        session = client.connect();
+    }
+
+    public void cleanup() {
+        session.close();
+        client.close();
+    }
+
+    public void updateState(List<TridentTuple> tuples, final TridentCollector collector) {
+        Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
+
+        List<Statement> statements = new ArrayList<>();
+        for (TridentTuple tuple : tuples) {
+            statements.addAll(options.cqlStatementTupleMapper.map(conf, session, tuple));
+        }
+        BatchStatement batchStatement = new BatchStatement(options.batchingType);
+        batchStatement.addAll(statements);
+
+        try {
+            session.execute(batchStatement);
+        } catch (Exception e) {
+            LOG.warn("Batch write operation is failed.");
+            collector.reportError(e);
+            throw new FailedException(e);
+        }
+
+    }
+
+    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+        Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
+        Preconditions.checkNotNull(options.cqlResultSetValuesMapper, "CassandraState.Options should have cqlResultSetValuesMapper");
+
+        List<List<Values>> batchRetrieveResult = new ArrayList<>();
+        try {
+            for (TridentTuple tridentTuple : tridentTuples) {
+                List<Statement> statements = options.cqlStatementTupleMapper.map(conf, session, tridentTuple);
+                for (Statement statement : statements) {
+                    List<List<Values>> values = options.cqlResultSetValuesMapper.map(session, statement, tridentTuple);
+                    batchRetrieveResult.addAll(values);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch retrieve operation is failed.");
+            throw new FailedException(e);
+        }
+        return batchRetrieveResult;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
new file mode 100644
index 0000000..285fa38
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraStateFactory implements StateFactory {
+    private final CassandraState.Options options;
+
+    public CassandraStateFactory(CassandraState.Options options) {
+        this.options = options;
+    }
+
+    public CassandraStateFactory(CQLStatementTupleMapper cqlStatementTupleMapper, CQLResultSetValuesMapper cqlResultSetValuesMapper) {
+        this(new CassandraState.Options(new CassandraContext()).withCQLStatementTupleMapper(cqlStatementTupleMapper).withCQLResultSetValuesMapper(cqlResultSetValuesMapper));
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        CassandraState cassandraState = new CassandraState(conf, metrics, partitionIndex, numPartitions, options);
+        cassandraState.prepare();
+
+        return cassandraState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
new file mode 100644
index 0000000..6453198
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class CassandraStateUpdater extends BaseStateUpdater<CassandraState> {
+
+    @Override
+    public void updateState(CassandraState state, List<TridentTuple> list, TridentCollector collector) {
+        state.updateState(list, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
new file mode 100644
index 0000000..be7bda1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *
+ */
+public class TridentResultSetValuesMapper implements CQLResultSetValuesMapper {
+    private Fields outputDeclaredFields;
+
+    public TridentResultSetValuesMapper(Fields outputDeclaredFields) {
+        this.outputDeclaredFields = outputDeclaredFields;
+    }
+
+    @Override
+    public List<List<Values>> map(Session session, Statement statement, ITuple tuple) {
+        List<List<Values>> list = new ArrayList<>();
+        ResultSet resultSet = session.execute(statement);
+        for (Row row : resultSet) {
+            final Values values = new Values();
+            for (String field : outputDeclaredFields) {
+                if (tuple.contains(field)) {
+                    values.add(tuple.getValueByField(field));
+                } else {
+                    values.add(row.getObject(field));
+                }
+            }
+            list.add(new LinkedList<Values>() {{
+                add(values);
+            }});
+        }
+        return list;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
index 22e12da..843bf6b 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -36,7 +36,15 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.async;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.fields;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.loggedBatch;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.unLoggedBatch;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
 import static org.mockito.Mockito.when;
 
 public class DynamicStatementBuilderTest {
@@ -46,14 +54,14 @@ public class DynamicStatementBuilderTest {
     private static final Tuple mockTuple;
 
     @Rule
-    public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql","weather"));
+    public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql", "weather"));
 
     static {
         mockTuple = Mockito.mock(Tuple.class);
-        when(mockTuple.getValueByField("weatherstation_id")).thenReturn("1");
+        when(mockTuple.getValueByField("weather_station_id")).thenReturn("1");
         when(mockTuple.getValueByField("event_time")).thenReturn(NOW);
         when(mockTuple.getValueByField("temperature")).thenReturn("0°C");
-        when(mockTuple.getFields()).thenReturn(new Fields("weatherstation_id", "event_time", "temperature"));
+        when(mockTuple.getFields()).thenReturn(new Fields("weather_station_id", "event_time", "temperature"));
     }
 
     @Test
@@ -63,51 +71,51 @@ public class DynamicStatementBuilderTest {
                         insertInto("weather", "temperature").values(all()),
                         insertInto("weather", "temperature").values(all())
                 ),
-                "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
-                "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
+                "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
+                "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
         );
     }
 
     @Test
     public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
         executeStatementAndAssert(insertInto("weather", "temperature").values(all()).build(),
-                "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+                "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
     }
 
     @Test
     public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
         executeStatementAndAssert(insertInto("temperature").values(all()).build(),
-                "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+                "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
     }
 
     @Test
     public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        executeStatementAndAssert(insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature"))).build(),
-                "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+        executeStatementAndAssert(insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature"))).build(),
+                "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
     }
 
     @Test
     public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
         executeBatchStatementAndAssert(loggedBatch(
-                insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature")))
-        ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+                insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
+        ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
     }
 
     @Test
     public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
         executeBatchStatementAndAssert(unLoggedBatch(
-                insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature")))
-        ),  "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+                insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
+        ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
     }
 
     private void executeBatchStatementAndAssert(CQLStatementTupleMapper mapper, String... results) {
         List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
 
-        BatchStatement statement = (BatchStatement)map.get(0);
+        BatchStatement statement = (BatchStatement) map.get(0);
         Collection<Statement> statements = statement.getStatements();
         Assert.assertEquals(results.length, statements.size());
 
-        for(Statement s : statements)
+        for (Statement s : statements)
             Assert.assertTrue(Arrays.asList(results).contains(s.toString()));
     }
 
@@ -116,15 +124,15 @@ public class DynamicStatementBuilderTest {
         List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
 
         List<String> listExpected = Arrays.asList(expected);
-        for( int i=0; i< map.size(); i++) {
+        for (int i = 0; i < map.size(); i++) {
             Assert.assertEquals(listExpected.get(i), map.get(i).toString());
         }
     }
 
     @Test
     public void shouldBuildStaticBoundStatement() {
-        CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weatherstation_id, event_time, temperature) VALUES(?, ?, ?)")
-                .bind(with(field("weatherstation_id"), field("event_time").now(), field("temperature")));
+        CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weather_station_id, event_time, temperature) VALUES(?, ?, ?)")
+                .bind(with(field("weather_station_id"), field("event_time").now(), field("temperature")));
         List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
         Statement statement = map.get(0);
         Assert.assertNotNull(statement);

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
index 49a1f80..0b7d8a5 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
@@ -52,7 +52,7 @@ public class WeatherSpout extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-        outputFieldsDeclarer.declare(new Fields("weatherstation_id", "temperature"));
+        outputFieldsDeclarer.declare(new Fields("weather_station_id", "temperature"));
     }
 
     @Override
@@ -77,7 +77,7 @@ public class WeatherSpout extends BaseRichSpout {
 
     @Override
     public void nextTuple() {
-        if(  emit.get() < maxQueries.get() ) {
+        if (emit.get() < maxQueries.get()) {
             spoutOutputCollector.emit(new Values(stationID, "38°C"), emit.incrementAndGet());
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 4253189..80eb95a 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -26,7 +26,9 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
 
 
 /**
@@ -34,16 +36,17 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
  */
 public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
 
-    public static final String SPOUT_MOCK  = "spout-mock";
+    public static final String SPOUT_MOCK = "spout-mock";
     public static final String BOLT_WRITER = "writer";
 
-    @Test @Ignore("The sleep method should be used in tests")
+    @Test
+    @Ignore("The sleep method should be used in tests")
     public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
         executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
     }
 
     private SimpleCQLStatementTupleMapper getInsertInto() {
-        return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build();
+        return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
     }
 
     protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
@@ -57,7 +60,7 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
 
         runLocalTopologyAndWait(builder);
 
-        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'");
+        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
         Assert.assertEquals(maxQueries, rows.all().size());
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index 7717d4d..2aef53c 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -26,23 +26,26 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
 
 /**
  *
  */
 public class CassandraWriterBoltTest extends BaseTopologyTest {
 
-    public static final String SPOUT_MOCK  = "spout-mock";
+    public static final String SPOUT_MOCK = "spout-mock";
     public static final String BOLT_WRITER = "writer";
 
-    @Test @Ignore("The sleep method should be used in tests")
+    @Test
+    @Ignore("The sleep method should be used in tests")
     public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
         executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
     }
 
     private SimpleCQLStatementTupleMapper getInsertInto() {
-        return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build();
+        return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
     }
 
     protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
@@ -56,7 +59,7 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
 
         runLocalTopologyAndWait(builder);
 
-        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'");
+        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
         Assert.assertEquals(maxQueries, rows.all().size());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
new file mode 100644
index 0000000..c775919
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.bolt.BaseTopologyTest;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.trident.state.CassandraQuery;
+import org.apache.storm.cassandra.trident.state.CassandraState;
+import org.apache.storm.cassandra.trident.state.CassandraStateFactory;
+import org.apache.storm.cassandra.trident.state.CassandraStateUpdater;
+import org.apache.storm.cassandra.trident.state.TridentResultSetValuesMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
+
+/**
+ *
+ */
+public class TridentTopologyTest extends BaseTopologyTest {
+
+    @Test
+    public void testTridentTopology() throws Exception {
+
+        Session session = cassandraCQLUnit.session;
+        String[] stationIds = {"station-1", "station-2", "station-3"};
+        for (int i = 1; i < 4; i++) {
+            ResultSet resultSet = session.execute("INSERT INTO weather.station(id, name) VALUES(?, ?)", stationIds[i-1],
+                    "Foo-Station-" + new Random().nextInt());
+        }
+
+        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.station");
+        for (Row row : rows) {
+            System.out.println("####### row = " + row);
+        }
+
+        WeatherBatchSpout weatherBatchSpout =
+                new WeatherBatchSpout(new Fields("weather_station_id", "temperature", "event_time"), 3,
+                        stationIds);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("cassandra-trident-stream", weatherBatchSpout);
+
+        CassandraStateFactory insertValuesStateFactory = getInsertTemperatureStateFactory();
+
+        CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
+
+        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+        stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
+
+        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+
+        StormTopology stormTopology = topology.build();
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("wordCounter", getConfig(), stormTopology);
+        Thread.sleep(30 * 1000);
+
+        rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature");
+        Assert.assertTrue(rows.iterator().hasNext()); // basic sanity check
+
+        cluster.killTopology("wordCounter");
+        cluster.shutdown();
+    }
+
+    public static class PrintFunction extends BaseFunction {
+
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            System.out.println("####### tuple = " + tuple.getValues());
+            collector.emit(tuple.getValues());
+        }
+    }
+
+    private CassandraStateFactory getInsertTemperatureStateFactory() {
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+                "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+                .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        return new CassandraStateFactory(options);
+    }
+
+    public CassandraStateFactory getSelectWeatherStationStateFactory() {
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+                .bind(with(field("weather_station_id").as("id")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+        return new CassandraStateFactory(options);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
new file mode 100644
index 0000000..62f31db
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class WeatherBatchSpout implements IBatchSpout {
+    private final Fields outputFields;
+    private final int batchSize;
+    private final String[] stationIds;
+    private final HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+    List<Object>[] outputs;
+
+    public WeatherBatchSpout(Fields fields, int batchSize, String[] stationIds) {
+        this.outputFields = fields;
+        this.batchSize = batchSize;
+        this.stationIds = stationIds;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> batch = this.batches.get(batchId);
+        if(batch == null){
+            batch = new ArrayList<>();
+            for (int i=0; i< batchSize; i++) {
+                batch.add(createTuple());
+            }
+            this.batches.put(batchId, batch);
+        }
+        for(List<Object> list : batch){
+            collector.emit(list);
+        }
+    }
+
+    private List<Object> createTuple() {
+        final Random random = new Random();
+        List<Object> values = new ArrayList<Object>(){{
+            add(stationIds[random.nextInt(stationIds.length)]);
+            add(random.nextInt(100) + "");
+            add(UUID.randomUUID());}};
+        return values;
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return outputFields;
+    }
+
+    public int getRemainingBatches() {
+        return batches.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/resources/schema.cql
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/resources/schema.cql b/external/storm-cassandra/src/test/resources/schema.cql
index 04f8c45..1d131a4 100644
--- a/external/storm-cassandra/src/test/resources/schema.cql
+++ b/external/storm-cassandra/src/test/resources/schema.cql
@@ -16,9 +16,15 @@
 */
  
 CREATE TABLE temperature (
-    weatherstation_id   TEXT,
+    weather_station_id   TEXT,
+    weather_station_name   TEXT,
     event_time          TIMEUUID,
     temperature         TEXT,
-    PRIMARY KEY(weatherstation_id, event_time)
-)
+    PRIMARY KEY(weather_station_id, event_time)
+);
 
+CREATE TABLE station (
+    id TEXT,
+    name TEXT,
+    PRIMARY KEY(id)
+);
\ No newline at end of file