You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/02 00:49:27 UTC

[6/7] storm git commit: STORM-1075 clean/refactor external module storm-cassandra

STORM-1075 clean/refactor external module storm-cassandra


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5565c438
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5565c438
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5565c438

Branch: refs/heads/master
Commit: 5565c43817deb2f3d7b77b5f6975dd3d16c802c4
Parents: 641300e
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Wed Nov 18 18:20:21 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Tue Dec 1 22:36:17 2015 +0100

----------------------------------------------------------------------
 external/storm-cassandra/README.md              | 36 +++++++---
 .../storm/cassandra/Murmur3StreamGrouping.java  | 46 +++++++++++-
 .../storm/cassandra/bolt/BaseCassandraBolt.java |  9 +--
 .../bolt/BatchCassandraWriterBolt.java          | 27 ++++---
 .../cassandra/bolt/CassandraWriterBolt.java     |  2 +-
 .../storm/cassandra/client/CassandraConf.java   | 74 +++++++++++++++++---
 .../storm/cassandra/client/ClusterFactory.java  |  6 +-
 .../cassandra/client/impl/DefaultClient.java    | 10 +--
 .../storm/cassandra/executor/AsyncExecutor.java | 15 ++--
 .../bolt/BatchCassandraWriterBoltTest.java      |  3 +-
 .../cassandra/bolt/CassandraWriterBoltTest.java |  3 +-
 11 files changed, 182 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 96454b6..5337b29 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -12,15 +12,18 @@ Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*
 ### Configuration
 The following properties may be passed to storm configuration.
 
-| **Property name**                     | **Description** | **Default**  |
-| ------------------------------------- | ----------------| -------------|
-| **cassandra.keyspace**                | -               |              |
-| **cassandra.nodes**                   | -               | {"localhost"}|
-| **cassandra.username**                | -               | -            |
-| **cassandra.password**                | -               | -            |
-| **cassandra.port**                    | -               | 9092         |
-| **cassandra.output.consistencyLevel** | -               | ONE          |
-| **cassandra.batch.size.rows**         | -               | 100          |
+| **Property name**                            | **Description** | **Default**         |
+| ---------------------------------------------| ----------------| --------------------|
+| **cassandra.keyspace**                       | -               |                     |
+| **cassandra.nodes**                          | -               | {"localhost"}       |
+| **cassandra.username**                       | -               | -                   |
+| **cassandra.password**                       | -               | -                   |
+| **cassandra.port**                           | -               | 9092                |
+| **cassandra.output.consistencyLevel**        | -               | ONE                 |
+| **cassandra.batch.size.rows**                | -               | 100                 |
+| **cassandra.retryPolicy**                    | -               | DefaultRetryPolicy  |
+| **cassandra.reconnectionPolicy.baseDelayMs** | -               | 100 (ms)            |
+| **cassandra.reconnectionPolicy.maxDelayMs**  | -               | 60000 (ms)          |
 
 ### CassandraWriterBolt
 
@@ -160,6 +163,21 @@ For instance, this may be used to remit a new tuple on error, or to chain querie
     }
 ```
 
+### Murmur3FieldGrouping
+
+[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java)  can be used to optimise cassandra writes.
+The stream is partitioned among the bolt's tasks based on the specified row partition keys.
+
+```java
+CassandraWriterBolt bolt = new CassandraWriterBolt(
+    insertInto("album")
+        .values(
+            with(fields("title", "year", "performer", "genre", "tracks")
+            ).build());
+builder.setBolt("BOLT_WRITER", bolt, 4)
+        .customGrouping("spout", new Murmur3StreamGrouping("title"))
+```
+
 ## License
 
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
index a3f6887..966aacd 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -22,13 +22,18 @@ import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.task.WorkerTopologyContext;
 import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Fields;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -41,12 +46,43 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping {
 
     private List<Integer> targetTasks;
 
+    private List<Integer> partitionKeyIndexes;
+
+    /**
+     * A list of partition key. The order of specified keys will be used to generate the partition key hash.
+     * It should respect the column order defined into the targeted CQL table.
+     */
+    private List<String> partitionKeyNames;
+
+    /**
+     * Creates a new {@link Murmur3StreamGrouping} instance.
+     * @param partitionKeyNames {@link org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}.
+     */
+    public Murmur3StreamGrouping(String...partitionKeyNames) {
+        this( Arrays.asList(partitionKeyNames));
+    }
+
+    /**
+     * Creates a new {@link Murmur3StreamGrouping} instance.
+     * @param partitionKeyNames {@link org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}.
+     */
+    public Murmur3StreamGrouping(List<String> partitionKeyNames) {
+        this.partitionKeyNames = partitionKeyNames;
+    }
+
+
     /**
      * {@inheritDoc}
      */
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         this.targetTasks = targetTasks;
+
+        this.partitionKeyIndexes = new ArrayList<>();
+        Fields componentOutputFields = context.getComponentOutputFields(stream);
+        for (String partitionKeyName : partitionKeyNames) {
+            partitionKeyIndexes.add(componentOutputFields.fieldIndex(partitionKeyName));
+        }
     }
 
     /**
@@ -55,13 +91,21 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping {
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
         try {
-            int n = Math.abs( (int) hashes(values) % targetTasks.size() );
+            int n = Math.abs( (int) hashes(getKeyValues(values)) % targetTasks.size() );
             return Lists.newArrayList(targetTasks.get(n));
         } catch (IOException e) {
             throw new FailedException(e);
         }
     }
 
+    private List<Object> getKeyValues(List<Object> values) {
+        List<Object> keys = new ArrayList<>();
+        for(Integer idx : partitionKeyIndexes) {
+            keys.add(values.get(idx));
+        }
+        return keys;
+    }
+
     /**
      * Computes the murmur3 hash for the specified values.
      * http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
index 7211ad3..dafcb22 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -26,6 +26,7 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.TupleUtils;
+import backtype.storm.utils.Utils;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import org.apache.storm.cassandra.BaseExecutionResultHandler;
@@ -105,7 +106,7 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
     }
 
     public BaseCassandraBolt withOutputFields(Fields fields) {
-        this.outputsFields.put(null, fields);
+        this.outputsFields.put(Utils.DEFAULT_STREAM_ID, fields);
         return this;
     }
 
@@ -139,7 +140,7 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
     public final void execute(Tuple input) {
         getAsyncHandler().flush(outputCollector);
         if (TupleUtils.isTick(input)) {
-            tick();
+            onTickTuple();
             outputCollector.ack(input);
         } else {
             process(input);
@@ -156,14 +157,14 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
     /**
      * Calls by an input tick tuple.
      */
-    abstract protected void tick();
+    abstract protected void onTickTuple();
 
     /**
      * {@inheritDoc}
      */
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        Fields fields = this.outputsFields.remove(null);
+        Fields fields = this.outputsFields.remove(Utils.DEFAULT_STREAM_ID);
         if( fields != null) declarer.declare(fields);
         for(Map.Entry<String, Fields> entry : this.outputsFields.entrySet()) {
             declarer.declareStream(entry.getKey(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
index c4c0110..fd597df 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
@@ -42,14 +42,14 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
 
     public static final int DEFAULT_EMIT_FREQUENCY = 2;
 
-    private static final int QUEUE_MAX_SIZE = 1000;
-
     private LinkedBlockingQueue<Tuple> queue;
     
     private int tickFrequencyInSeconds;
     
     private long lastModifiedTimesMillis;
 
+    private int batchMaxSize = 1000;
+
     private String componentID;
 
     private AsyncResultHandler<List<Tuple>> asyncResultHandler;
@@ -79,7 +79,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
     public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
         super.prepare(stormConfig, topologyContext, outputCollector);
         this.componentID = topologyContext.getThisComponentId();
-        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
+        this.queue = new LinkedBlockingQueue<>(batchMaxSize);
         this.lastModifiedTimesMillis = now();
     }
 
@@ -106,7 +106,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
      * {@inheritDoc}
      */
     @Override
-    protected void tick() {
+    protected void onTickTuple() {
         prepareAndExecuteStatement();
     }
 
@@ -119,7 +119,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
                 List<PairStatementTuple> psl = buildStatement(inputs);
 
                 int sinceLastModified = updateAndGetSecondsSinceLastModified();
-                LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified));
+                LOG.debug(logPrefix() + "Execute cql batches with {} statements after {} seconds", size, sinceLastModified);
 
                 checkTimeElapsedSinceLastExec(sinceLastModified);
 
@@ -127,14 +127,14 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
 
                 int batchSize = 0;
                 for (PairBatchStatementTuples batch : batchBuilder) {
-                    LOG.debug(logPrefix() + String.format("Writing data to %s in batches of %s rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size()));
+                    LOG.debug(logPrefix() + "Writing data to {} in batches of {} rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size());
                     getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs());
                     batchSize++;
                 }
 
-                int pending = getAsyncExecutor().getPendingExec();
+                int pending = getAsyncExecutor().getPendingTasksSize();
                 if (pending > batchSize) {
-                    LOG.warn( logPrefix() + String.format("Currently pending tasks is superior to the number of submit batches(%s) : %s", batchSize, pending));
+                    LOG.warn( logPrefix() + "Currently pending tasks is superior to the number of submit batches({}) : {}", batchSize, pending);
                 }
                 
             } catch (Throwable r) {
@@ -157,7 +157,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
 
     private void checkTimeElapsedSinceLastExec(int sinceLastModified) {
         if(sinceLastModified > tickFrequencyInSeconds)
-            LOG.warn( logPrefix() + String.format("The time elapsed since last execution exceeded tick tuple frequency - %d > %d seconds", sinceLastModified, tickFrequencyInSeconds));
+            LOG.warn( logPrefix() + "The time elapsed since last execution exceeded tick tuple frequency - {} > {} seconds", sinceLastModified, tickFrequencyInSeconds);
     }
 
     private String logPrefix() {
@@ -170,6 +170,15 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
     }
 
     /**
+     * Maximum number of tuple kept in memory before inserting batches to cassandra.
+     * @param size the max queue size.
+     * @return <code>this</code>
+     */
+    public BatchCassandraWriterBolt withQueueSize(int size) {
+        this.batchMaxSize = size;
+        return this;
+    }
+    /**
      * {@inheritDoc}
      */
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
index 663f26a..19097f2 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
@@ -63,7 +63,7 @@ public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> {
      * {@inheritDoc}
      */
     @Override
-    protected void tick() {
+    protected void onTickTuple() {
         /** do nothing **/
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
index ccee468..9201801 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
@@ -18,11 +18,17 @@
  */
 package org.apache.storm.cassandra.client;
 
+import backtype.storm.utils.Utils;
 import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.policies.DefaultRetryPolicy;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.FallthroughRetryPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
 import com.google.common.base.Objects;
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Configuration used by cassandra storm components.
@@ -36,6 +42,9 @@ public class CassandraConf implements Serializable {
     public static final String CASSANDRA_NODES              = "cassandra.nodes";
     public static final String CASSANDRA_PORT               = "cassandra.port";
     public static final String CASSANDRA_BATCH_SIZE_ROWS    = "cassandra.batch.size.rows";
+    public static final String CASSANDRA_RETRY_POLICY       = "cassandra.retryPolicy";
+    public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS  = "cassandra.reconnectionPolicy.baseDelayMs";
+    public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS   = "cassandra.reconnectionPolicy.maxDelayMs";
 
     /**
      * The authorized cassandra username.
@@ -67,6 +76,21 @@ public class CassandraConf implements Serializable {
      * The maximal numbers of rows per batch.
      */
     private int batchSizeRows       = 100;
+
+    /**
+     * The retry policy to use for the new cluster.
+     */
+    private String retryPolicyName;
+
+    /**
+     * The base delay in milliseconds to use for the reconnection policy.
+     */
+    private long reconnectionPolicyBaseMs;
+
+    /**
+     * The maximum delay to wait between two attempts.
+     */
+    private long reconnectionPolicyMaxMs;
     
     /**
      * Creates a new {@link CassandraConf} instance.
@@ -81,13 +105,17 @@ public class CassandraConf implements Serializable {
      * @param conf The storm configuration.
      */
     public CassandraConf(Map<String, Object> conf) {
-        this.username = getOrElse(conf, CASSANDRA_USERNAME, null);
-        this.password = getOrElse(conf, CASSANDRA_PASSWORD, null);
+
+        this.username = (String)Utils.get(conf, CASSANDRA_USERNAME, null);
+        this.password = (String)Utils.get(conf, CASSANDRA_PASSWORD, null);
         this.keyspace = get(conf, CASSANDRA_KEYSPACE);
-        this.consistencyLevel = ConsistencyLevel.valueOf(getOrElse(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
-        this.nodes    = getOrElse(conf, CASSANDRA_NODES, "localhost").split(",");
-        this.batchSizeRows = getOrElse(conf, CASSANDRA_BATCH_SIZE_ROWS, 100);
-        this.port = conf.get(CASSANDRA_PORT) != null ? Integer.valueOf((String)conf.get(CASSANDRA_PORT)) : 9042;
+        this.consistencyLevel = ConsistencyLevel.valueOf((String)Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
+        this.nodes    = ((String)Utils.get(conf, CASSANDRA_NODES, "localhost")).split(",");
+        this.batchSizeRows = Utils.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
+        this.port = Utils.getInt(conf.get(CASSANDRA_PORT), 9042);
+        this.retryPolicyName = (String)Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
+        this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L);
+        this.reconnectionPolicyMaxMs  = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1));
     }
 
     public String getUsername() {
@@ -118,6 +146,24 @@ public class CassandraConf implements Serializable {
         return this.port;
     }
 
+    public long getReconnectionPolicyBaseMs() {
+        return reconnectionPolicyBaseMs;
+    }
+
+    public long getReconnectionPolicyMaxMs() {
+        return reconnectionPolicyMaxMs;
+    }
+
+    public RetryPolicy getRetryPolicy() {
+        if(this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName()))
+            return DowngradingConsistencyRetryPolicy.INSTANCE;
+        if(this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName()))
+            return FallthroughRetryPolicy.INSTANCE;
+        if(this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName()))
+            return DefaultRetryPolicy.INSTANCE;
+        throw new IllegalArgumentException("Unknown cassandra retry policy " + this.retryPolicyName);
+    }
+
     private <T> T get(Map<String, Object> conf, String key) {
         Object o = conf.get(key);
         if(o == null) {
@@ -126,9 +172,16 @@ public class CassandraConf implements Serializable {
         return (T)o;
     }
 
-    private <T> T getOrElse(Map<String, Object> conf, String key, T def) {
-        T o = (T) conf.get(key);
-        return (o == null) ? def : o;
+    public static Long getLong(Object o, Long defaultValue) {
+        if (null == o) {
+            return defaultValue;
+        }
+        if (o instanceof Number) {
+            return ((Number) o).longValue();
+        } else if (o instanceof String) {
+            return Long.parseLong((String) o);
+        }
+        throw new IllegalArgumentException("Don't know how to convert " + o + " to long");
     }
 
     @Override
@@ -141,6 +194,9 @@ public class CassandraConf implements Serializable {
                 .add("port", port)
                 .add("consistencyLevel", consistencyLevel)
                 .add("batchSizeRows", batchSizeRows)
+                .add("retryPolicyName", retryPolicyName)
+                .add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs)
+                .add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs)
                 .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
index 886f6d3..c00bfd7 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
@@ -50,8 +50,10 @@ public class ClusterFactory extends BaseBeanFactory<Cluster> {
                 .withoutMetrics()
                 .addContactPoints(cassandraConf.getNodes())
                 .withPort(cassandraConf.getPort())
-                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
-                .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1)))
+                .withRetryPolicy(cassandraConf.getRetryPolicy())
+                .withReconnectionPolicy(new ExponentialReconnectionPolicy(
+                        cassandraConf.getReconnectionPolicyBaseMs(),
+                        cassandraConf.getReconnectionPolicyMaxMs()))
                 .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
 
         final String username = cassandraConf.getUsername();

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
index 8ed9293..945d0a8 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
@@ -77,14 +77,14 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
     @Override
     public synchronized Session connect() throws NoHostAvailableException {
         if( isDisconnected() ) {
-            LOG.info(String.format("Connected to cluster: %s", cluster.getClusterName()));
+            LOG.info("Connected to cluster: {}", cluster.getClusterName());
             for ( Host host : getAllHosts())
-                LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()));
+                LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack());
 
-            LOG.info(String.format("Connect to cluster using keyspace %s", keyspace));
+            LOG.info("Connect to cluster using keyspace %s", keyspace);
             session = cluster.connect(keyspace);
         } else {
-            LOG.warn(String.format("%s - Already connected to cluster: %s", getExecutorName(), cluster.getClusterName()));
+            LOG.warn("{} - Already connected to cluster: {}", getExecutorName(), cluster.getClusterName());
         }
 
         if( session.isClosed() ) {
@@ -107,7 +107,7 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
     @Override
     public void close( ) {
         if( cluster != null && !cluster.isClosed() ) {
-            LOG.info(String.format("Try to close connection to cluster: %s", cluster.getClusterName()));
+            LOG.info("Try to close connection to cluster: {}", cluster.getClusterName());
             session.close();
             cluster.close();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
index 311ed11..5366c81 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Service to asynchronously executes cassandra statements.
@@ -47,7 +48,7 @@ public class AsyncExecutor<T> implements Serializable {
 
     protected AsyncResultHandler<T> handler;
 
-    private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( );
+    private AtomicInteger pending = new AtomicInteger();
 
     /**
      * Creates a new {@link AsyncExecutor} instance.
@@ -73,8 +74,8 @@ public class AsyncExecutor<T> implements Serializable {
     }
 
     /**
-     * Asynchronously executes all statements associated to the specified input. The input will be passed to
-     * the {@link #handler} once all queries succeed or failed.
+     * Asynchronously executes all statements associated to the specified input.
+     * The input will be passed to handler#onSuccess once all queries succeed or to handler#onFailure if any one of them fails.
      */
     public List<SettableFuture<T>> execAsync(List<Statement> statements, final T input) {
 
@@ -111,11 +112,11 @@ public class AsyncExecutor<T> implements Serializable {
      */
     public SettableFuture<T> execAsync(final Statement statement, final T inputs, final AsyncResultHandler<T> handler) {
         final SettableFuture<T> settableFuture = SettableFuture.create();
-        pending.put(settableFuture, true);
+        pending.incrementAndGet();
         ResultSetFuture future = session.executeAsync(statement);
         Futures.addCallback(future, new FutureCallback<ResultSet>() {
             public void release() {
-                pending.remove(settableFuture);
+                pending.decrementAndGet();
             }
 
             @Override
@@ -139,8 +140,8 @@ public class AsyncExecutor<T> implements Serializable {
     /**
      * Returns the number of currently executed tasks which are not yet completed.
      */
-    public int getPendingExec( ) {
-        return this.pending.size();
+    public int getPendingTasksSize() {
+        return this.pending.intValue();
     }
 
     public void shutdown( ) {

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 8d80ee1..4253189 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.ResultSet;
 import org.apache.storm.cassandra.WeatherSpout;
 import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
@@ -36,7 +37,7 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
     public static final String SPOUT_MOCK  = "spout-mock";
     public static final String BOLT_WRITER = "writer";
 
-    @Test
+    @Test @Ignore("The sleep method should be used in tests")
     public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
         executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5565c438/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index e1a9e9f..7717d4d 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.ResultSet;
 import org.apache.storm.cassandra.WeatherSpout;
 import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
@@ -35,7 +36,7 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
     public static final String SPOUT_MOCK  = "spout-mock";
     public static final String BOLT_WRITER = "writer";
 
-    @Test
+    @Test @Ignore("The sleep method should be used in tests")
     public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
         executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
     }