You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/02 00:49:23 UTC

[2/7] storm git commit: STORM-1075 add external module storm-cassandra

STORM-1075 add external module storm-cassandra


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/641300e2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/641300e2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/641300e2

Branch: refs/heads/master
Commit: 641300e2441d732fc2be98f05e70c6e5db4e4cf6
Parents: 437c4b1
Author: Florian Hussonnois <fl...@zenika.com>
Authored: Wed Oct 28 12:08:58 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Wed Nov 25 09:50:15 2015 +0100

----------------------------------------------------------------------
 external/storm-cassandra/README.md              | 180 +++++++++++++++++
 external/storm-cassandra/pom.xml                | 124 ++++++++++++
 .../AbstractExecutionResultHandler.java         |  60 ++++++
 .../cassandra/BaseExecutionResultHandler.java   |  85 ++++++++
 .../storm/cassandra/CassandraContext.java       |  92 +++++++++
 .../cassandra/DynamicStatementBuilder.java      | 199 +++++++++++++++++++
 .../storm/cassandra/ExecutionResultHandler.java |  98 +++++++++
 .../storm/cassandra/Murmur3StreamGrouping.java  |  89 +++++++++
 .../storm/cassandra/bolt/BaseCassandraBolt.java | 193 ++++++++++++++++++
 .../bolt/BatchCassandraWriterBolt.java          | 192 ++++++++++++++++++
 .../cassandra/bolt/CassandraWriterBolt.java     |  72 +++++++
 .../cassandra/bolt/GroupingBatchBuilder.java    |  68 +++++++
 .../bolt/PairBatchStatementTuples.java          |  52 +++++
 .../cassandra/bolt/PairStatementTuple.java      |  50 +++++
 .../storm/cassandra/client/CassandraConf.java   | 146 ++++++++++++++
 .../storm/cassandra/client/ClusterFactory.java  |  71 +++++++
 .../storm/cassandra/client/SimpleClient.java    |  42 ++++
 .../cassandra/client/SimpleClientProvider.java  |  35 ++++
 .../cassandra/client/impl/DefaultClient.java    | 123 ++++++++++++
 .../cassandra/context/BaseBeanFactory.java      |  65 ++++++
 .../storm/cassandra/context/BeanFactory.java    |  48 +++++
 .../storm/cassandra/context/WorkerCtx.java      |  89 +++++++++
 .../storm/cassandra/executor/AsyncExecutor.java | 152 ++++++++++++++
 .../executor/AsyncExecutorProvider.java         |  40 ++++
 .../cassandra/executor/AsyncResultHandler.java  |  64 ++++++
 .../executor/ExecutionResultCollector.java      |  99 +++++++++
 .../executor/impl/BatchAsyncResultHandler.java  |  73 +++++++
 .../executor/impl/SingleAsyncResultHandler.java |  72 +++++++
 .../query/BatchStatementTupleMapper.java        |  57 ++++++
 .../cassandra/query/CQLClauseTupleMapper.java   |  36 ++++
 .../cassandra/query/CQLStatementBuilder.java    |  31 +++
 .../query/CQLStatementTupleMapper.java          |  86 ++++++++
 .../cassandra/query/CQLTableTupleMapper.java    |  39 ++++
 .../cassandra/query/CQLValuesTupleMapper.java   |  74 +++++++
 .../storm/cassandra/query/ContextQuery.java     |  83 ++++++++
 .../query/SimpleCQLStatementTupleMapper.java    |  51 +++++
 .../query/impl/BoundStatementMapperBuilder.java | 107 ++++++++++
 .../query/impl/InsertStatementBuilder.java      | 153 ++++++++++++++
 .../query/impl/UpdateStatementBuilder.java      | 118 +++++++++++
 .../cassandra/query/selector/FieldSelector.java |  68 +++++++
 .../cassandra/DynamicStatementBuilderTest.java  | 133 +++++++++++++
 .../apache/storm/cassandra/WeatherSpout.java    |  84 ++++++++
 .../storm/cassandra/bolt/BaseTopologyTest.java  |  60 ++++++
 .../bolt/BatchCassandraWriterBoltTest.java      |  62 ++++++
 .../cassandra/bolt/CassandraWriterBoltTest.java |  63 ++++++
 .../src/test/resources/schema.cql               |   7 +
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  15 +-
 48 files changed, 4000 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
new file mode 100644
index 0000000..96454b6
--- /dev/null
+++ b/external/storm-cassandra/README.md
@@ -0,0 +1,180 @@
+Storm Cassandra Integration (CQL).
+-------------------
+
+[Apache Storm](https://storm.apache.org/) is a free and open source distributed realtime computation system.
+
+### Bolt API implementation for Apache Cassandra
+
+This library provides core storm bolt on top of Apache Cassandra.
+Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.
+
+
+### Configuration
+The following properties may be passed to storm configuration.
+
+| **Property name**                     | **Description** | **Default**  |
+| ------------------------------------- | ----------------| -------------|
+| **cassandra.keyspace**                | -               |              |
+| **cassandra.nodes**                   | -               | {"localhost"}|
+| **cassandra.username**                | -               | -            |
+| **cassandra.password**                | -               | -            |
+| **cassandra.port**                    | -               | 9092         |
+| **cassandra.output.consistencyLevel** | -               | ONE          |
+| **cassandra.batch.size.rows**         | -               | 100          |
+
+### CassandraWriterBolt
+
+####Static import
+```java
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*
+
+```
+
+#### Insert Query Builder
+##### Insert query including only the specified tuple fields.
+```java
+
+    new CassandraWriterBolt(
+        insertInto("album")
+            .values(
+                with(fields("title", "year", "performer", "genre", "tracks")
+                ).build());
+```
+##### Insert query including all tuple fields.
+```java
+
+    new CassandraWriterBolt(
+        insertInto("album")
+            .values(all()).build());
+```
+
+##### Insert multiple queries from one input tuple.
+```java
+
+    new CassandraWriterBolt(
+        async(
+        insertInto("titles_per_album").values(all()),
+        insertInto("titles_per_performer").values(all())
+        )
+    );
+```
+
+##### Insert query including some fields with aliases
+```java
+
+    new CassandraWriterBolt(
+        insertInto("album")
+            .values(
+                with(field("ti"),as("title",
+                     field("ye").as("year")),
+                     field("pe").as("performer")),
+                     field("ge").as("genre")),
+                     field("tr").as("tracks")),
+                     ).build());
+```
+
+##### Insert query with static bound query
+```java
+
+    new CassandraWriterBolt(
+         boundQuery("INSERT INTO album (\"title\", \"year\", \"performer\", \"genre\", \"tracks\") VALUES(?, ?, ?, ?, ?);")
+            .bind(all());
+```
+
+##### Insert query with bound statement load from storm configuration
+```java
+
+    new CassandraWriterBolt(
+         boundQuery(named("insertIntoAlbum"))
+            .bind(all());
+```
+
+##### Insert query with bound statement load from tuple field
+```java
+
+    new CassandraWriterBolt(
+         boundQuery(namedByField("cql"))
+            .bind(all());
+```
+
+##### Insert query with batch statement
+```java
+
+    // Logged
+    new CassandraWriterBolt(loggedBatch(
+        insertInto("title_per_album").values(all())
+        insertInto("title_per_performer").values(all())
+        )
+    );
+// UnLogged
+    new CassandraWriterBolt(unLoggedBatch(
+        insertInto("title_per_album").values(all())
+        insertInto("title_per_performer").values(all())
+        )
+    );
+```
+
+### How to handle query execution results
+
+The interface *ExecutionResultHandler* can be used to custom how an execution result should be handle.
+
+```java
+public interface ExecutionResultHandler extends Serializable {
+    void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
+
+    void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
+
+    void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
+
+    void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
+
+    void onQuerySuccess(OutputCollector collector, Tuple tuple);
+}
+```
+
+By default, the CassandraBolt fails a tuple on all Cassandra Exception (see [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)) .
+
+```java
+    new CassandraWriterBolt(insertInto("album").values(with(all()).build())
+            .withResultHandler(new MyCustomResultHandler());
+```
+
+### Declare Output fields
+
+A CassandraBolt can declare output fields / stream output fields.
+For instance, this may be used to remit a new tuple on error, or to chain queries.
+
+```java
+    new CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
+            .withResultHandler(new EmitOnDriverExceptionResultHandler());
+            .withStreamOutputFields("stream_error", new Fields("message");
+
+    public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler {
+        @Override
+        protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
+            LOG.error("An error occurred while executing cassandra statement", e);
+            collector.emit("stream_error", new Values(e.getMessage()));
+            collector.ack(tuple);
+        }
+    }
+```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
new file mode 100644
index 0000000..86cd87d
--- /dev/null
+++ b/external/storm-cassandra/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <packaging>jar</packaging>
+    <artifactId>storm-cassandra</artifactId>
+    <name>storm-cassandra</name>
+    <description>Storm Bolts for Apache Cassandra</description>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <java.version>1.7</java.version>
+        <org.slf4j.version>1.7.6</org.slf4j.version>
+        <jackson.databind.version>2.3.2</jackson.databind.version>
+        <junit.version>4.11</junit.version>
+        <guava.version>16.0.1</guava.version>
+        <commons-lang3.version>3.3</commons-lang3.version>
+        <cassandra.driver.core.version>2.1.7.1</cassandra.driver.core.version>
+    </properties>
+
+    <developers>
+        <developer>
+            <id>fhuss</id>
+            <name>Florian Hussonnois</name>
+            <email>florian.hussonnois@gmail.com</email>
+            <url>https://github.com/fhussonnois</url>
+            <roles>
+                <role>developer</role>
+            </roles>
+        </developer>
+    </developers>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+            <version>${cassandra.driver.core.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${org.slf4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.cassandraunit</groupId>
+            <artifactId>cassandra-unit</artifactId>
+            <version>2.1.3.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
new file mode 100644
index 0000000..80ae284
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import com.datastax.driver.core.exceptions.WriteTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Default interface to define strategies to apply when a query is either succeed or failed.
+ *
+ */
+public abstract class AbstractExecutionResultHandler implements ExecutionResultHandler {
+
+    public static final Logger LOG = LoggerFactory.getLogger(AbstractExecutionResultHandler.class);
+
+    @Override
+    public void onThrowable(Throwable t, OutputCollector collector, Tuple i) {
+        if( t instanceof QueryValidationException) {
+            this.onQueryValidationException((QueryValidationException)t, collector, i);
+        } else if (t instanceof ReadTimeoutException) {
+            this.onReadTimeoutException((ReadTimeoutException)t, collector, i);
+        } else if (t instanceof WriteTimeoutException) {
+            this.onWriteTimeoutException((WriteTimeoutException) t, collector, i);
+        } else if (t instanceof UnavailableException) {
+            this.onUnavailableException((UnavailableException) t, collector, i);
+        } else {
+            collector.reportError(t);
+            collector.fail(i);
+        }
+    }
+
+    @Override
+    public void onThrowable(Throwable t, OutputCollector collector, List<Tuple> tl) {
+        for(Tuple i : tl) onThrowable(t, collector, i);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
new file mode 100644
index 0000000..c7fc4f1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.exceptions.*;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link ExecutionResultHandler} which fail the incoming tuple when an {@link com.datastax.driver.core.exceptions.DriverException} is thrown.
+ * The exception is then automatically report to storm.
+ *
+ */
+public class BaseExecutionResultHandler extends AbstractExecutionResultHandler {
+
+    private final static org.slf4j.Logger LOG = LoggerFactory.getLogger(BaseExecutionResultHandler.class);
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple) {
+        onDriverException(e, collector, tuple);
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple) {
+        onDriverException(e, collector, tuple);
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple) {
+        onDriverException(e, collector, tuple);
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple) {
+        onDriverException(e, collector, tuple);
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onQuerySuccess(OutputCollector collector, Tuple tuple) {
+
+    }
+
+    /**
+     * This method is called when an one of the methods of the {@link BaseExecutionResultHandler} is not
+     * overridden. It can be practical if you want to bundle some/all of the methods to a single method.
+     *
+     * @param e the exception throws
+     * @param collector the output collector
+     * @param tuple the tuple in failure
+     */
+    protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
+        LOG.error("An error occurred while executing cassandra statement", e);
+        collector.fail(tuple);
+        collector.reportError(e);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
new file mode 100644
index 0000000..3081b0d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.storm.cassandra.client.CassandraConf;
+import org.apache.storm.cassandra.client.ClusterFactory;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.client.impl.DefaultClient;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+import org.apache.storm.cassandra.context.WorkerCtx;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraContext extends WorkerCtx implements SimpleClientProvider {
+
+    /**
+     * Creates a new {@link CassandraContext} instance.
+     */
+    public CassandraContext() {
+        register(SimpleClient.class, new ClientFactory());
+        register(CassandraConf.class, new CassandraConfFactory());
+        register(Cluster.class, new ClusterFactory());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public SimpleClient getClient(Map<String, Object> config) {
+        SimpleClient client = getWorkerBean(SimpleClient.class, config);
+        if (client.isClose() )
+            client = getWorkerBean(SimpleClient.class, config, true);
+        return client;
+    }
+
+    /**
+     * Simple class to make {@link CassandraConf} from a Storm topology configuration.
+     */
+    public static final class CassandraConfFactory extends BaseBeanFactory<CassandraConf> {
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected CassandraConf make(Map<String, Object> stormConf) {
+            return new CassandraConf(stormConf);
+        }
+    }
+
+    /**
+     * Simple class to make {@link ClientFactory} from a Storm topology configuration.
+     */
+    public static final class ClientFactory extends BaseBeanFactory<SimpleClient> {
+
+        private static final Logger LOG = LoggerFactory.getLogger(ClientFactory.class);
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected SimpleClient make(Map<String, Object> stormConf) {
+            Cluster cluster = this.context.getWorkerBean(Cluster.class, stormConf);
+            if( cluster.isClosed() ) {
+                LOG.warn("Cluster is closed - trigger new initialization!");
+                cluster = this.context.getWorkerBean(Cluster.class, stormConf, true);
+            }
+            CassandraConf config = this.context.getWorkerBean(CassandraConf.class, stormConf);
+            return new DefaultClient(cluster, config.getKeyspace());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
new file mode 100644
index 0000000..deea8da
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.BoundStatementMapperBuilder;
+import org.apache.storm.cassandra.query.impl.InsertStatementBuilder;
+import org.apache.storm.cassandra.query.impl.UpdateStatementBuilder;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class DynamicStatementBuilder implements Serializable {
+
+    private DynamicStatementBuilder() {
+    }
+
+    /**
+     * Builds a new insert statement for the specified table.
+     *
+     * @param table the table's name.
+     * @return a new {@link InsertStatementBuilder} instance.
+     */
+    public static final InsertStatementBuilder insertInto(String table) {
+        return new InsertStatementBuilder(table);
+    }
+    /**
+     * Builds a new insert statement based on the specified CQL mapper.
+     *
+     * @param mapper the CQL mapper.
+     * @return a new {@link InsertStatementBuilder} instance.
+     */
+    public static final InsertStatementBuilder insertInto(CQLTableTupleMapper mapper) {
+        return new InsertStatementBuilder(mapper);
+    }
+    /**
+     * Builds a new insert statement for the specified keyspace and table.
+     *
+     * @param ks the keyspace to use.
+     * @param table the table's name.
+     * @return a new {@link InsertStatementBuilder} instance.
+     */
+    public static final InsertStatementBuilder insertInto(String ks, String table) {
+        return new InsertStatementBuilder(table, ks);
+    }
+
+    /**
+     * Builds a new update statement for the specified table.
+     *
+     * @param table the table's name.
+     * @return a new {@link UpdateStatementBuilder} instance.
+     */
+    public static final UpdateStatementBuilder update(String table) {
+        return new UpdateStatementBuilder(table);
+    }
+
+    /**
+     * Builds a new update statement for the specified keyspace and table.
+     *
+     * @param table the table's name.
+     * @return a new {@link UpdateStatementBuilder} instance.
+     */
+    public static final UpdateStatementBuilder update(String ks, String table) {
+        return new UpdateStatementBuilder(table, ks);
+    }
+
+    /**
+     * Builds a new bound statement based on the specified query.
+     *
+     * @param cql the query.
+     * @return a new {@link BoundStatementMapperBuilder} instance.
+     */
+    public static final BoundStatementMapperBuilder boundQuery(String cql) {
+        return new BoundStatementMapperBuilder(cql);
+    }
+
+    /**
+     * Builds a new bound statement identified by the given field.
+     *
+     * @param field a context used to resolve the cassandra query.
+     * @return a new {@link BoundStatementMapperBuilder} instance.
+     */
+    public static final BoundStatementMapperBuilder boundQuery(ContextQuery field) {
+        return new BoundStatementMapperBuilder(field);
+    }
+
+    /**
+     * Builds multiple statements which will be executed asynchronously.
+     *
+     * @param builders a list of {@link CQLStatementBuilder}.
+     * @return a new {@link CQLStatementTupleMapper}.
+     */
+    public static final CQLStatementTupleMapper async(final CQLStatementBuilder... builders) {
+        return new CQLStatementTupleMapper.DynamicCQLStatementTupleMapper(Arrays.asList(builders));
+    }
+
+    /**
+     * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#LOGGED} batch statement for the specified CQL statement builders.
+     */
+    public static final BatchStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
+        return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders);
+    }
+    /**
+     * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders.
+     */
+    public static final BatchStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
+        return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders);
+    }
+    /**
+     * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders.
+     */
+    public static final BatchStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
+        return newBatchStatementBuilder(BatchStatement.Type.UNLOGGED, builders);
+    }
+
+    private static BatchStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
+        List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length);
+        for(CQLStatementBuilder b : Arrays.asList(builders))
+            mappers.add(b.build());
+        return new BatchStatementTupleMapper(type, mappers);
+    }
+
+    /**
+     * Retrieves from the storm configuration the specified named query.
+     *
+     * @param name query's name.
+     */
+    public static final ContextQuery named(final String name) {
+        return new ContextQuery.BoundQueryContext(name);
+    }
+
+    /**
+     * Retrieves from the storm configuration the named query specified by a tuple field.
+     *
+     * @param fieldName field's name that contains the named of the query.
+     */
+    public static final ContextQuery namedByField(final String fieldName) {
+        return new ContextQuery.BoundQueryNamedByFieldContext(fieldName);
+    }
+
+
+    /**
+     * Maps a CQL value to the specified field from an input tuple.
+     *
+     * @param name the name of a tuple field.
+     * @return a new {@link FieldSelector}.
+     */
+    public static final FieldSelector field(final String name) {
+        return new FieldSelector(name);
+    }
+
+    /**
+     * Maps CQL values to all specified fields from an input tuple.
+     *
+     * @param fields a list of tuple fields
+     * @return a list of {@link FieldSelector}.
+     */
+    public static final FieldSelector[] fields(final String... fields) {
+        int size = fields.length;
+        List<FieldSelector> fl = new ArrayList<>(size);
+        for(int i = 0 ; i < size; i++)
+                fl.add(new FieldSelector(fields[i]));
+        return fl.toArray(new FieldSelector[size]);
+    }
+
+    /**
+     * Includes only the specified tuple fields.
+     *
+     * @param fields a list of field selector.
+     */
+    public static final CQLValuesTupleMapper with(final FieldSelector... fields) {
+        return new CQLValuesTupleMapper.WithFieldTupleMapper(Arrays.asList(fields));
+    }
+
+    /**
+     * Includes all tuple fields.
+     */
+    public static final CQLValuesTupleMapper all() {
+        return new CQLValuesTupleMapper.AllTupleMapper();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
new file mode 100644
index 0000000..b804ee5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import com.datastax.driver.core.exceptions.WriteTimeoutException;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Default interface to define strategies to apply when a query is either succeed or failed.
+ *
+ */
+public interface ExecutionResultHandler extends Serializable {
+
+    /**
+     * Invoked when a {@link com.datastax.driver.core.exceptions.QueryValidationException} is thrown.
+     *
+     * @param e the cassandra exception.
+     * @param collector the storm collector.
+     * @param tuple an input tuple.
+     */
+    void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
+    /**
+     * Invoked when a {@link com.datastax.driver.core.exceptions.ReadTimeoutException} is thrown.
+     *
+     * @param e the cassandra exception.
+     * @param collector the storm collector.
+     * @param tuple an input tuple.
+     */
+    void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
+
+    /**
+     * Invoked when a {@link com.datastax.driver.core.exceptions.WriteTimeoutException} is thrown.
+     *
+     * @param e the cassandra exception.
+     * @param collector the storm collector.
+     * @param tuple an input tuple.
+     */
+    void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
+    /**
+     * Invoked when a {@link com.datastax.driver.core.exceptions.UnavailableException} is thrown.
+     *
+     *
+     * @param e the cassandra exception.
+     * @param collector the storm collector.
+     * @param tuple an input tuple.
+     */
+    void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
+    /**
+     * Invoked when a query is executed with success.
+     * This method is NOT responsible for acknowledging input tuple.
+     *
+     * @param collector the storm collector.
+     * @param tuple an input tuple.
+     */
+    void onQuerySuccess(OutputCollector collector, Tuple tuple);
+
+    /**
+     * Default method used to handle any type of exception.
+     *
+     * @param t the thrown exception
+     * @param collector the storm collector.
+     * @param i an input tuple.
+     */
+    void onThrowable(Throwable t, OutputCollector collector, Tuple i);
+
+    /**
+     * Default method used to handle any type of exception.
+     *
+     * @param t the thrown exception
+     * @param collector the storm collector.
+     * @param tl a list of input tuple.
+     */
+    void onThrowable(Throwable t, OutputCollector collector, List<Tuple> tl);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
new file mode 100644
index 0000000..a3f6887
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.topology.FailedException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
+ *
+ * This stream grouping may be used to optimise writes to Apache Cassandra.
+ */
+public class Murmur3StreamGrouping implements CustomStreamGrouping {
+
+    private List<Integer> targetTasks;
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        this.targetTasks = targetTasks;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        try {
+            int n = Math.abs( (int) hashes(values) % targetTasks.size() );
+            return Lists.newArrayList(targetTasks.get(n));
+        } catch (IOException e) {
+            throw new FailedException(e);
+        }
+    }
+
+    /**
+     * Computes the murmur3 hash for the specified values.
+     * http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys
+     * https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+     *
+     * @param values the fields which are part of the (compose) partition key.
+     * @return the computed hash for input values.
+     * @throws java.io.IOException
+     */
+    @VisibleForTesting
+    public static long hashes(List<Object> values) throws IOException {
+        byte[] keyBytes;
+        try(ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(bos)) {
+            for(Object key : values) {
+                byte[] arr = ((String)key).getBytes("UTF-8");
+                out.writeShort(arr.length);
+                out.write(arr, 0, arr.length);
+                out.writeByte(0);
+            }
+            out.flush();
+            keyBytes = bos.toByteArray();
+        }
+        return Hashing.murmur3_128().hashBytes(keyBytes).asLong();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
new file mode 100644
index 0000000..7211ad3
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import org.apache.storm.cassandra.BaseExecutionResultHandler;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.ExecutionResultHandler;
+import org.apache.storm.cassandra.client.CassandraConf;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.executor.AsyncExecutor;
+import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A base cassandra bolt.
+ *
+ * Default {@link backtype.storm.topology.base.BaseRichBolt}
+ */
+public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BaseCassandraBolt.class);
+
+    protected OutputCollector outputCollector;
+    
+    protected SimpleClientProvider clientProvider;
+    protected SimpleClient client;
+    protected Session session;
+    protected Map stormConfig;
+
+    protected CassandraConf cassandraConfConfig;
+
+    private CQLStatementTupleMapper mapper;
+    private ExecutionResultHandler resultHandler;
+
+    transient private  Map<String, Fields> outputsFields = new HashMap<>();
+
+    /**
+     * Creates a new {@link CassandraWriterBolt} instance.
+     * @param mapper
+     */
+    public BaseCassandraBolt(CQLStatementTupleMapper mapper, SimpleClientProvider clientProvider) {
+        this.mapper = mapper;
+        this.clientProvider = clientProvider;
+    }
+    /**
+     * Creates a new {@link CassandraWriterBolt} instance.
+     * @param tupleMapper
+     */
+    public BaseCassandraBolt(CQLStatementTupleMapper tupleMapper) {
+        this(tupleMapper, new CassandraContext());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
+        this.outputCollector = outputCollector;
+        this.stormConfig = stormConfig;
+        this.cassandraConfConfig = new CassandraConf(stormConfig);
+        this.client = clientProvider.getClient(this.stormConfig);
+        try {
+            session = client.connect();
+        } catch (NoHostAvailableException e) {
+            outputCollector.reportError(e);
+        }
+    }
+
+    public BaseCassandraBolt withResultHandler(ExecutionResultHandler resultHandler) {
+        this.resultHandler = resultHandler;
+        return this;
+    }
+
+    public BaseCassandraBolt withOutputFields(Fields fields) {
+        this.outputsFields.put(null, fields);
+        return this;
+    }
+
+    public BaseCassandraBolt withStreamOutputFields(String stream, Fields fields) {
+        if( stream == null || stream.length() == 0) throw new IllegalArgumentException("'stream' should not be null");
+        this.outputsFields.put(stream, fields);
+        return this;
+    }
+
+    protected ExecutionResultHandler getResultHandler() {
+        if(resultHandler == null) resultHandler = new BaseExecutionResultHandler();
+        return resultHandler;
+    }
+
+    protected CQLStatementTupleMapper getMapper() {
+        return mapper;
+    }
+
+    abstract protected AsyncResultHandler<T> getAsyncHandler() ;
+
+    protected AsyncExecutor<T> getAsyncExecutor() {
+        return AsyncExecutorProvider.getLocal(session, getAsyncHandler());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @param input the tuple to process.
+     */
+    @Override
+    public final void execute(Tuple input) {
+        getAsyncHandler().flush(outputCollector);
+        if (TupleUtils.isTick(input)) {
+            tick();
+            outputCollector.ack(input);
+        } else {
+            process(input);
+        }
+    }
+
+    /**
+     * Process a single tuple of input.
+     *
+     * @param input The input tuple to be processed.
+     */
+    abstract protected void process(Tuple input);
+
+    /**
+     * Calls by an input tick tuple.
+     */
+    abstract protected void tick();
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        Fields fields = this.outputsFields.remove(null);
+        if( fields != null) declarer.declare(fields);
+        for(Map.Entry<String, Fields> entry : this.outputsFields.entrySet()) {
+            declarer.declareStream(entry.getKey(), entry.getValue());
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        // add tick tuple each second to force acknowledgement of pending tuples.
+        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
+        return conf;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void cleanup() {
+        getAsyncExecutor().shutdown();
+        getAsyncHandler().flush(outputCollector);
+        client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
new file mode 100644
index 0000000..c4c0110
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Time;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
+
+    private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
+
+    public static final int DEFAULT_EMIT_FREQUENCY = 2;
+
+    private static final int QUEUE_MAX_SIZE = 1000;
+
+    private LinkedBlockingQueue<Tuple> queue;
+    
+    private int tickFrequencyInSeconds;
+    
+    private long lastModifiedTimesMillis;
+
+    private String componentID;
+
+    private AsyncResultHandler<List<Tuple>> asyncResultHandler;
+
+    /**
+     * Creates a new {@link CassandraWriterBolt} instance.
+     *
+     * @param tupleMapper
+     */
+    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+        this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
+    }
+
+    /**
+     * Creates a new {@link CassandraWriterBolt} instance.
+     *
+     * @param tupleMapper
+     */
+    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
+        super(tupleMapper);
+        this.tickFrequencyInSeconds = tickFrequencyInSeconds;
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
+        super.prepare(stormConfig, topologyContext, outputCollector);
+        this.componentID = topologyContext.getThisComponentId();
+        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
+        this.lastModifiedTimesMillis = now();
+    }
+
+    @Override
+    protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
+        if( asyncResultHandler == null) {
+            asyncResultHandler = new BatchAsyncResultHandler(getResultHandler());
+        }
+        return asyncResultHandler;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void process(Tuple input) {
+        if( ! queue.offer(input) ) {
+            LOG.info(logPrefix() + "The message queue is full, preparing batch statement...");
+            prepareAndExecuteStatement();
+            queue.add(input);
+        }
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void tick() {
+        prepareAndExecuteStatement();
+    }
+
+    public void prepareAndExecuteStatement() {
+        int size = queue.size();
+        if( size > 0 ) {
+            List<Tuple> inputs = new ArrayList<>(size);
+            queue.drainTo(inputs);
+            try {
+                List<PairStatementTuple> psl = buildStatement(inputs);
+
+                int sinceLastModified = updateAndGetSecondsSinceLastModified();
+                LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified));
+
+                checkTimeElapsedSinceLastExec(sinceLastModified);
+
+                GroupingBatchBuilder batchBuilder = new GroupingBatchBuilder(cassandraConfConfig.getBatchSizeRows(), psl);
+
+                int batchSize = 0;
+                for (PairBatchStatementTuples batch : batchBuilder) {
+                    LOG.debug(logPrefix() + String.format("Writing data to %s in batches of %s rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size()));
+                    getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs());
+                    batchSize++;
+                }
+
+                int pending = getAsyncExecutor().getPendingExec();
+                if (pending > batchSize) {
+                    LOG.warn( logPrefix() + String.format("Currently pending tasks is superior to the number of submit batches(%s) : %s", batchSize, pending));
+                }
+                
+            } catch (Throwable r) {
+                LOG.error(logPrefix() + "Error(s) occurred while preparing batch statements");
+                getAsyncHandler().failure(r, inputs);
+            }
+        }
+    }
+
+    private List<PairStatementTuple> buildStatement(List<Tuple> inputs) {
+        List<PairStatementTuple> stmts = new ArrayList<>(inputs.size());
+
+        for(Tuple t : inputs) {
+            List<Statement> sl = getMapper().map(stormConfig, session, t);
+            for(Statement s : sl)
+                stmts.add(new PairStatementTuple(t, s) );
+        }
+        return stmts;
+    }
+
+    private void checkTimeElapsedSinceLastExec(int sinceLastModified) {
+        if(sinceLastModified > tickFrequencyInSeconds)
+            LOG.warn( logPrefix() + String.format("The time elapsed since last execution exceeded tick tuple frequency - %d > %d seconds", sinceLastModified, tickFrequencyInSeconds));
+    }
+
+    private String logPrefix() {
+        return componentID + " - ";
+    }
+
+    public BatchCassandraWriterBolt withTickFrequency(long time, TimeUnit unit) {
+        this.tickFrequencyInSeconds = (int)unit.toSeconds(time);
+        return this;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
+        return conf;
+    }
+    
+    private int updateAndGetSecondsSinceLastModified() {
+        long now = now();
+        int seconds = (int) (now - lastModifiedTimesMillis) / 1000;
+        lastModifiedTimesMillis = now;
+        return seconds;
+    }
+
+    private long now() {
+        return Time.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
new file mode 100644
index 0000000..663f26a
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.SingleAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+
+import java.util.List;
+
+public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> {
+
+    private AsyncResultHandler<Tuple> asyncResultHandler;
+
+    /**
+     * Creates a new {@link CassandraWriterBolt} instance.
+     *
+     * @param tupleMapper
+     */
+    public CassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+        super(tupleMapper);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected AsyncResultHandler<Tuple> getAsyncHandler() {
+        if( asyncResultHandler == null) {
+            asyncResultHandler = new SingleAsyncResultHandler(getResultHandler());
+        }
+        return asyncResultHandler;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void process(Tuple input) {
+        List<Statement> statements = getMapper().map(stormConfig, session, input);
+        if (statements.size() == 1) getAsyncExecutor().execAsync(statements.get(0), input);
+        else getAsyncExecutor().execAsync(statements, input);
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void tick() {
+        /** do nothing **/
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
new file mode 100644
index 0000000..ea63b3d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.BatchStatement;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * 
+ */
+public class GroupingBatchBuilder implements Iterable<PairBatchStatementTuples> {
+
+    private int batchSizeRows;
+    
+    private List<PairStatementTuple> statements;
+
+    /**
+     * Creates a new  {@link GroupingBatchBuilder} instance.
+     * @param batchSizeRows
+     */
+    public GroupingBatchBuilder(int batchSizeRows, List<PairStatementTuple> statements) {
+        this.batchSizeRows = batchSizeRows;
+        this.statements = statements;
+    }
+
+    @Override
+    public Iterator<PairBatchStatementTuples> iterator() {
+        return build().iterator();
+    }
+
+    private Iterable<PairBatchStatementTuples> build( ) {
+        Iterable<List<PairStatementTuple>> partition = Iterables.partition(statements, batchSizeRows);
+        return Iterables.transform(partition, new Function<List<PairStatementTuple>, PairBatchStatementTuples>() {
+            @Override
+            public PairBatchStatementTuples apply(List<PairStatementTuple> l) {
+                final List<Tuple> inputs = new LinkedList<>();
+                final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                for (PairStatementTuple pair : l) {
+                    batch.add(pair.getStatement());
+                    inputs.add(pair.getTuple());
+                }
+                return new PairBatchStatementTuples(inputs, batch);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
new file mode 100644
index 0000000..736c482
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.BatchStatement;
+
+import java.util.List;
+
+/**
+ * Simple class to pair a list of tuples with a single batch statement.
+ */
+public class PairBatchStatementTuples {
+
+    private final List<Tuple> inputs;
+
+    private final BatchStatement statement;
+
+    /**
+     * Creates a new {@link PairBatchStatementTuples} instance.
+     * @param inputs List of inputs attached to this batch.
+     * @param statement The batch statement.
+     */
+    public PairBatchStatementTuples(List<Tuple> inputs, BatchStatement statement) {
+        this.inputs = inputs;
+        this.statement = statement;
+    }
+
+    public List<Tuple> getInputs() {
+        return inputs;
+    }
+
+    public BatchStatement getStatement() {
+        return statement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
new file mode 100644
index 0000000..8f50574
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.Statement;
+
+/**
+ * Simple class to pair a tuple with a statement.
+ */
+public class PairStatementTuple {
+    
+    private final Tuple tuple;
+    
+    private final Statement statement;
+
+    /**
+     * Creates a new {@link PairStatementTuple} instance.
+     * @param tuple
+     * @param statement
+     */
+    public PairStatementTuple(Tuple tuple, Statement statement) {
+        this.tuple = tuple;
+        this.statement = statement;
+    }
+
+    public Tuple getTuple() {
+        return tuple;
+    }
+
+    public Statement getStatement() {
+        return statement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
new file mode 100644
index 0000000..ccee468
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Configuration used by cassandra storm components.
+ */
+public class CassandraConf implements Serializable {
+    
+    public static final String CASSANDRA_USERNAME           = "cassandra.username";
+    public static final String CASSANDRA_PASSWORD           = "cassandra.password";
+    public static final String CASSANDRA_KEYSPACE           = "cassandra.keyspace";
+    public static final String CASSANDRA_CONSISTENCY_LEVEL  = "cassandra.output.consistencyLevel";
+    public static final String CASSANDRA_NODES              = "cassandra.nodes";
+    public static final String CASSANDRA_PORT               = "cassandra.port";
+    public static final String CASSANDRA_BATCH_SIZE_ROWS    = "cassandra.batch.size.rows";
+
+    /**
+     * The authorized cassandra username.
+     */
+    private String username;
+    /**
+     * The authorized cassandra password
+     */
+    private String password;
+    /**
+     * The cassandra keyspace.
+     */
+    private String keyspace;
+    /**
+     * List of contacts nodes.
+     */
+    private String[] nodes = {"localhost"};
+
+    /**
+     * The port used to connect to nodes.
+     */
+    private int port = 9092;
+
+    /**
+     * Consistency level used to write statements.
+     */
+    private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+    /**
+     * The maximal numbers of rows per batch.
+     */
+    private int batchSizeRows       = 100;
+    
+    /**
+     * Creates a new {@link CassandraConf} instance.
+     */
+    public CassandraConf() {
+        super();
+    }
+
+    /**
+     * Creates a new {@link CassandraConf} instance.
+     *
+     * @param conf The storm configuration.
+     */
+    public CassandraConf(Map<String, Object> conf) {
+        this.username = getOrElse(conf, CASSANDRA_USERNAME, null);
+        this.password = getOrElse(conf, CASSANDRA_PASSWORD, null);
+        this.keyspace = get(conf, CASSANDRA_KEYSPACE);
+        this.consistencyLevel = ConsistencyLevel.valueOf(getOrElse(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
+        this.nodes    = getOrElse(conf, CASSANDRA_NODES, "localhost").split(",");
+        this.batchSizeRows = getOrElse(conf, CASSANDRA_BATCH_SIZE_ROWS, 100);
+        this.port = conf.get(CASSANDRA_PORT) != null ? Integer.valueOf((String)conf.get(CASSANDRA_PORT)) : 9042;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getKeyspace() {
+        return keyspace;
+    }
+
+    public String[] getNodes() {
+        return nodes;
+    }
+
+    public ConsistencyLevel getConsistencyLevel() {
+        return consistencyLevel;
+    }
+
+    public int getBatchSizeRows() {
+        return batchSizeRows;
+    }
+
+    public int getPort() {
+        return this.port;
+    }
+
+    private <T> T get(Map<String, Object> conf, String key) {
+        Object o = conf.get(key);
+        if(o == null) {
+            throw new IllegalArgumentException("No '" + key + "' value found in configuration!");
+        }
+        return (T)o;
+    }
+
+    private <T> T getOrElse(Map<String, Object> conf, String key, T def) {
+        T o = (T) conf.get(key);
+        return (o == null) ? def : o;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("username", username)
+                .add("password", password)
+                .add("keyspace", keyspace)
+                .add("nodes", nodes)
+                .add("port", port)
+                .add("consistencyLevel", consistencyLevel)
+                .add("batchSizeRows", batchSizeRows)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
new file mode 100644
index 0000000..886f6d3
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default interface to build cassandra Cluster from the a Storm Topology configuration.
+ */
+public class ClusterFactory extends BaseBeanFactory<Cluster> {
+
+    /**
+     * Creates a new Cluster based on the specified configuration.
+     * @param stormConf the storm configuration.
+     * @return a new a new {@link com.datastax.driver.core.Cluster} instance.
+     */
+    @Override
+    protected Cluster make(Map<String, Object> stormConf) {
+        CassandraConf cassandraConf = new CassandraConf(stormConf);
+
+        Cluster.Builder cluster = Cluster.builder()
+                .withoutJMXReporting()
+                .withoutMetrics()
+                .addContactPoints(cassandraConf.getNodes())
+                .withPort(cassandraConf.getPort())
+                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
+                .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1)))
+                .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
+
+        final String username = cassandraConf.getUsername();
+        final String password = cassandraConf.getPassword();
+
+        if( StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+            cluster.withAuthProvider(new PlainTextAuthProvider(username, password));
+        }
+
+        QueryOptions options = new QueryOptions()
+                .setConsistencyLevel(cassandraConf.getConsistencyLevel());
+        cluster.withQueryOptions(options);
+
+
+        return cluster.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
new file mode 100644
index 0000000..3bd80ed
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Session;
+
+public interface SimpleClient {
+
+    /**
+     * Creates a new session on this cluster.
+     * *
+     * @return a new session on this cluster.
+     * @throws com.datastax.driver.core.exceptions.NoHostAvailableException if we cannot reach any cassandra contact points.
+     */
+    Session connect();
+
+    /**
+     * Close the underlying {@link com.datastax.driver.core.Cluster} instance.
+     */
+    void close();
+
+    /**
+     * Checks whether the underlying cluster instance is closed.
+     */
+    boolean isClose();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
new file mode 100644
index 0000000..412cb70
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import java.util.Map;
+
+/**
+ * Default interface to provide cassandra client.
+ */
+public interface SimpleClientProvider {
+
+    /**
+     * Creates a new cassandra client based on the specified storm configuration.
+     *
+     * @param config The configuration passed to the storm topology.
+     * @return a new {@link SimpleClient} instance.
+     */
+    SimpleClient getClient(Map<String, Object> config);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
new file mode 100644
index 0000000..8ed9293
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance.
+ */
+public class DefaultClient implements SimpleClient, Closeable, Serializable {
+    
+    private final static Logger LOG = LoggerFactory.getLogger(DefaultClient.class);
+    
+    private String keyspace;
+
+    private Cluster cluster;
+    
+    private Session session;
+
+    /**
+     * Create a new {@link DefaultClient} instance.
+     * 
+     * @param cluster a cassandra cluster client.
+     */
+    public DefaultClient(Cluster cluster, String keyspace) {
+        Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
+        this.cluster = cluster;
+        this.keyspace = keyspace;
+        
+    }
+
+    public Set<Host> getAllHosts() {
+        Metadata metadata = getMetadata();
+        return metadata.getAllHosts();
+    }
+
+    public Metadata getMetadata() {
+        return cluster.getMetadata();
+    }
+
+
+    private String getExecutorName() {
+        Thread thread = Thread.currentThread();
+        return thread.getName();
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized Session connect() throws NoHostAvailableException {
+        if( isDisconnected() ) {
+            LOG.info(String.format("Connected to cluster: %s", cluster.getClusterName()));
+            for ( Host host : getAllHosts())
+                LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()));
+
+            LOG.info(String.format("Connect to cluster using keyspace %s", keyspace));
+            session = cluster.connect(keyspace);
+        } else {
+            LOG.warn(String.format("%s - Already connected to cluster: %s", getExecutorName(), cluster.getClusterName()));
+        }
+
+        if( session.isClosed() ) {
+            LOG.warn("Session has been closed - create new one!");
+            this.session = cluster.newSession();
+        }
+        return session;
+    }
+
+    /**
+     * Checks whether the client is already connected to the cluster.
+     */
+    protected boolean isDisconnected() {
+        return session == null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close( ) {
+        if( cluster != null && !cluster.isClosed() ) {
+            LOG.info(String.format("Try to close connection to cluster: %s", cluster.getClusterName()));
+            session.close();
+            cluster.close();
+        }
+    }
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isClose() {
+        return this.cluster.isClosed();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java
new file mode 100644
index 0000000..1ce6bac
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.context;
+
+import java.util.Map;
+
+/**
+ * Base BeanProvider implementation.
+ */
+public abstract class BaseBeanFactory<T> implements BeanFactory<T> {
+
+    protected WorkerCtx context;
+
+    protected volatile T instance;
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setStormContext(WorkerCtx context) {
+        this.context = context;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized T get(Map<String, Object> stormConf) {
+        if( instance != null) return instance;
+        return instance = make(stormConf);
+    }
+    /**
+     * Return a new instance of T.
+     */
+    protected abstract T make(final Map<String, Object> stormConf);
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public BeanFactory<T> newInstance() {
+        Class<? extends BaseBeanFactory> clazz = this.getClass();
+        try {
+            BaseBeanFactory factory = clazz.newInstance();
+            factory.setStormContext(this.context);
+            return factory;
+        } catch (IllegalAccessException | InstantiationException e) {
+            throw new RuntimeException("Cannot create a new instance of " + clazz.getSimpleName(), e);
+        }
+    }
+}
\ No newline at end of file