You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/02 00:49:24 UTC
[3/7] storm git commit: STORM-1075 clean/refactor external module
storm-cassandra
STORM-1075 clean/refactor external module storm-cassandra
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b039d331
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b039d331
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b039d331
Branch: refs/heads/master
Commit: b039d331aaa2fe99ac038e670c5b8f6f1c126ccf
Parents: 641300e
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Wed Nov 18 18:20:21 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Wed Nov 25 09:51:55 2015 +0100
----------------------------------------------------------------------
external/storm-cassandra/README.md | 36 +++++++---
.../storm/cassandra/Murmur3StreamGrouping.java | 46 +++++++++++-
.../storm/cassandra/bolt/BaseCassandraBolt.java | 9 +--
.../bolt/BatchCassandraWriterBolt.java | 27 ++++---
.../cassandra/bolt/CassandraWriterBolt.java | 2 +-
.../storm/cassandra/client/CassandraConf.java | 74 +++++++++++++++++---
.../storm/cassandra/client/ClusterFactory.java | 6 +-
.../cassandra/client/impl/DefaultClient.java | 10 +--
.../storm/cassandra/executor/AsyncExecutor.java | 15 ++--
9 files changed, 178 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 96454b6..5337b29 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -12,15 +12,18 @@ Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*
### Configuration
The following properties may be passed to storm configuration.
-| **Property name** | **Description** | **Default** |
-| ------------------------------------- | ----------------| -------------|
-| **cassandra.keyspace** | - | |
-| **cassandra.nodes** | - | {"localhost"}|
-| **cassandra.username** | - | - |
-| **cassandra.password** | - | - |
-| **cassandra.port** | - | 9092 |
-| **cassandra.output.consistencyLevel** | - | ONE |
-| **cassandra.batch.size.rows** | - | 100 |
+| **Property name** | **Description** | **Default** |
+| ---------------------------------------------| ----------------| --------------------|
+| **cassandra.keyspace** | - | |
+| **cassandra.nodes** | - | {"localhost"} |
+| **cassandra.username** | - | - |
+| **cassandra.password** | - | - |
+| **cassandra.port** | - | 9092 |
+| **cassandra.output.consistencyLevel** | - | ONE |
+| **cassandra.batch.size.rows** | - | 100 |
+| **cassandra.retryPolicy** | - | DefaultRetryPolicy |
+| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) |
+| **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) |
### CassandraWriterBolt
@@ -160,6 +163,21 @@ For instance, this may be used to remit a new tuple on error, or to chain querie
}
```
+### Murmur3FieldGrouping
+
+[Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java) can be used to optimise cassandra writes.
+The stream is partitioned among the bolt's tasks based on the specified row partition keys.
+
+```java
+CassandraWriterBolt bolt = new CassandraWriterBolt(
+ insertInto("album")
+ .values(
+ with(fields("title", "year", "performer", "genre", "tracks")
+ ).build());
+builder.setBolt("BOLT_WRITER", bolt, 4)
+ .customGrouping("spout", new Murmur3StreamGrouping("title"))
+```
+
## License
Licensed to the Apache Software Foundation (ASF) under one
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
index a3f6887..966aacd 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -22,13 +22,18 @@ import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Fields;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
@@ -41,12 +46,43 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping {
private List<Integer> targetTasks;
+ private List<Integer> partitionKeyIndexes;
+
+ /**
+ * A list of partition key. The order of specified keys will be used to generate the partition key hash.
+ * It should respect the column order defined into the targeted CQL table.
+ */
+ private List<String> partitionKeyNames;
+
+ /**
+ * Creates a new {@link Murmur3StreamGrouping} instance.
+ * @param partitionKeyNames {@link org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}.
+ */
+ public Murmur3StreamGrouping(String...partitionKeyNames) {
+ this( Arrays.asList(partitionKeyNames));
+ }
+
+ /**
+ * Creates a new {@link Murmur3StreamGrouping} instance.
+ * @param partitionKeyNames {@link org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}.
+ */
+ public Murmur3StreamGrouping(List<String> partitionKeyNames) {
+ this.partitionKeyNames = partitionKeyNames;
+ }
+
+
/**
* {@inheritDoc}
*/
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
+
+ this.partitionKeyIndexes = new ArrayList<>();
+ Fields componentOutputFields = context.getComponentOutputFields(stream);
+ for (String partitionKeyName : partitionKeyNames) {
+ partitionKeyIndexes.add(componentOutputFields.fieldIndex(partitionKeyName));
+ }
}
/**
@@ -55,13 +91,21 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping {
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
try {
- int n = Math.abs( (int) hashes(values) % targetTasks.size() );
+ int n = Math.abs( (int) hashes(getKeyValues(values)) % targetTasks.size() );
return Lists.newArrayList(targetTasks.get(n));
} catch (IOException e) {
throw new FailedException(e);
}
}
+ private List<Object> getKeyValues(List<Object> values) {
+ List<Object> keys = new ArrayList<>();
+ for(Integer idx : partitionKeyIndexes) {
+ keys.add(values.get(idx));
+ }
+ return keys;
+ }
+
/**
* Computes the murmur3 hash for the specified values.
* http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
index 7211ad3..dafcb22 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -26,6 +26,7 @@ import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TupleUtils;
+import backtype.storm.utils.Utils;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import org.apache.storm.cassandra.BaseExecutionResultHandler;
@@ -105,7 +106,7 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
}
public BaseCassandraBolt withOutputFields(Fields fields) {
- this.outputsFields.put(null, fields);
+ this.outputsFields.put(Utils.DEFAULT_STREAM_ID, fields);
return this;
}
@@ -139,7 +140,7 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
public final void execute(Tuple input) {
getAsyncHandler().flush(outputCollector);
if (TupleUtils.isTick(input)) {
- tick();
+ onTickTuple();
outputCollector.ack(input);
} else {
process(input);
@@ -156,14 +157,14 @@ public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
/**
* Calls by an input tick tuple.
*/
- abstract protected void tick();
+ abstract protected void onTickTuple();
/**
* {@inheritDoc}
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- Fields fields = this.outputsFields.remove(null);
+ Fields fields = this.outputsFields.remove(Utils.DEFAULT_STREAM_ID);
if( fields != null) declarer.declare(fields);
for(Map.Entry<String, Fields> entry : this.outputsFields.entrySet()) {
declarer.declareStream(entry.getKey(), entry.getValue());
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
index c4c0110..fd597df 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
@@ -42,14 +42,14 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
public static final int DEFAULT_EMIT_FREQUENCY = 2;
- private static final int QUEUE_MAX_SIZE = 1000;
-
private LinkedBlockingQueue<Tuple> queue;
private int tickFrequencyInSeconds;
private long lastModifiedTimesMillis;
+ private int batchMaxSize = 1000;
+
private String componentID;
private AsyncResultHandler<List<Tuple>> asyncResultHandler;
@@ -79,7 +79,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
super.prepare(stormConfig, topologyContext, outputCollector);
this.componentID = topologyContext.getThisComponentId();
- this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
+ this.queue = new LinkedBlockingQueue<>(batchMaxSize);
this.lastModifiedTimesMillis = now();
}
@@ -106,7 +106,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
* {@inheritDoc}
*/
@Override
- protected void tick() {
+ protected void onTickTuple() {
prepareAndExecuteStatement();
}
@@ -119,7 +119,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
List<PairStatementTuple> psl = buildStatement(inputs);
int sinceLastModified = updateAndGetSecondsSinceLastModified();
- LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified));
+ LOG.debug(logPrefix() + "Execute cql batches with {} statements after {} seconds", size, sinceLastModified);
checkTimeElapsedSinceLastExec(sinceLastModified);
@@ -127,14 +127,14 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
int batchSize = 0;
for (PairBatchStatementTuples batch : batchBuilder) {
- LOG.debug(logPrefix() + String.format("Writing data to %s in batches of %s rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size()));
+ LOG.debug(logPrefix() + "Writing data to {} in batches of {} rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size());
getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs());
batchSize++;
}
- int pending = getAsyncExecutor().getPendingExec();
+ int pending = getAsyncExecutor().getPendingTasksSize();
if (pending > batchSize) {
- LOG.warn( logPrefix() + String.format("Currently pending tasks is superior to the number of submit batches(%s) : %s", batchSize, pending));
+ LOG.warn( logPrefix() + "Currently pending tasks is superior to the number of submit batches({}) : {}", batchSize, pending);
}
} catch (Throwable r) {
@@ -157,7 +157,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
private void checkTimeElapsedSinceLastExec(int sinceLastModified) {
if(sinceLastModified > tickFrequencyInSeconds)
- LOG.warn( logPrefix() + String.format("The time elapsed since last execution exceeded tick tuple frequency - %d > %d seconds", sinceLastModified, tickFrequencyInSeconds));
+ LOG.warn( logPrefix() + "The time elapsed since last execution exceeded tick tuple frequency - {} > {} seconds", sinceLastModified, tickFrequencyInSeconds);
}
private String logPrefix() {
@@ -170,6 +170,15 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
}
/**
+ * Maximum number of tuple kept in memory before inserting batches to cassandra.
+ * @param size the max queue size.
+ * @return <code>this</code>
+ */
+ public BatchCassandraWriterBolt withQueueSize(int size) {
+ this.batchMaxSize = size;
+ return this;
+ }
+ /**
* {@inheritDoc}
*/
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
index 663f26a..19097f2 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
@@ -63,7 +63,7 @@ public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> {
* {@inheritDoc}
*/
@Override
- protected void tick() {
+ protected void onTickTuple() {
/** do nothing **/
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
index ccee468..9201801 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
@@ -18,11 +18,17 @@
*/
package org.apache.storm.cassandra.client;
+import backtype.storm.utils.Utils;
import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.policies.DefaultRetryPolicy;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.FallthroughRetryPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
import com.google.common.base.Objects;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* Configuration used by cassandra storm components.
@@ -36,6 +42,9 @@ public class CassandraConf implements Serializable {
public static final String CASSANDRA_NODES = "cassandra.nodes";
public static final String CASSANDRA_PORT = "cassandra.port";
public static final String CASSANDRA_BATCH_SIZE_ROWS = "cassandra.batch.size.rows";
+ public static final String CASSANDRA_RETRY_POLICY = "cassandra.retryPolicy";
+ public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS = "cassandra.reconnectionPolicy.baseDelayMs";
+ public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS = "cassandra.reconnectionPolicy.maxDelayMs";
/**
* The authorized cassandra username.
@@ -67,6 +76,21 @@ public class CassandraConf implements Serializable {
* The maximal numbers of rows per batch.
*/
private int batchSizeRows = 100;
+
+ /**
+ * The retry policy to use for the new cluster.
+ */
+ private String retryPolicyName;
+
+ /**
+ * The base delay in milliseconds to use for the reconnection policy.
+ */
+ private long reconnectionPolicyBaseMs;
+
+ /**
+ * The maximum delay to wait between two attempts.
+ */
+ private long reconnectionPolicyMaxMs;
/**
* Creates a new {@link CassandraConf} instance.
@@ -81,13 +105,17 @@ public class CassandraConf implements Serializable {
* @param conf The storm configuration.
*/
public CassandraConf(Map<String, Object> conf) {
- this.username = getOrElse(conf, CASSANDRA_USERNAME, null);
- this.password = getOrElse(conf, CASSANDRA_PASSWORD, null);
+
+ this.username = (String)Utils.get(conf, CASSANDRA_USERNAME, null);
+ this.password = (String)Utils.get(conf, CASSANDRA_PASSWORD, null);
this.keyspace = get(conf, CASSANDRA_KEYSPACE);
- this.consistencyLevel = ConsistencyLevel.valueOf(getOrElse(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
- this.nodes = getOrElse(conf, CASSANDRA_NODES, "localhost").split(",");
- this.batchSizeRows = getOrElse(conf, CASSANDRA_BATCH_SIZE_ROWS, 100);
- this.port = conf.get(CASSANDRA_PORT) != null ? Integer.valueOf((String)conf.get(CASSANDRA_PORT)) : 9042;
+ this.consistencyLevel = ConsistencyLevel.valueOf((String)Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
+ this.nodes = ((String)Utils.get(conf, CASSANDRA_NODES, "localhost")).split(",");
+ this.batchSizeRows = Utils.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
+ this.port = Utils.getInt(conf.get(CASSANDRA_PORT), 9042);
+ this.retryPolicyName = (String)Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
+ this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L);
+ this.reconnectionPolicyMaxMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1));
}
public String getUsername() {
@@ -118,6 +146,24 @@ public class CassandraConf implements Serializable {
return this.port;
}
+ public long getReconnectionPolicyBaseMs() {
+ return reconnectionPolicyBaseMs;
+ }
+
+ public long getReconnectionPolicyMaxMs() {
+ return reconnectionPolicyMaxMs;
+ }
+
+ public RetryPolicy getRetryPolicy() {
+ if(this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName()))
+ return DowngradingConsistencyRetryPolicy.INSTANCE;
+ if(this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName()))
+ return FallthroughRetryPolicy.INSTANCE;
+ if(this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName()))
+ return DefaultRetryPolicy.INSTANCE;
+ throw new IllegalArgumentException("Unknown cassandra retry policy " + this.retryPolicyName);
+ }
+
private <T> T get(Map<String, Object> conf, String key) {
Object o = conf.get(key);
if(o == null) {
@@ -126,9 +172,16 @@ public class CassandraConf implements Serializable {
return (T)o;
}
- private <T> T getOrElse(Map<String, Object> conf, String key, T def) {
- T o = (T) conf.get(key);
- return (o == null) ? def : o;
+ public static Long getLong(Object o, Long defaultValue) {
+ if (null == o) {
+ return defaultValue;
+ }
+ if (o instanceof Number) {
+ return ((Number) o).longValue();
+ } else if (o instanceof String) {
+ return Long.parseLong((String) o);
+ }
+ throw new IllegalArgumentException("Don't know how to convert " + o + " to long");
}
@Override
@@ -141,6 +194,9 @@ public class CassandraConf implements Serializable {
.add("port", port)
.add("consistencyLevel", consistencyLevel)
.add("batchSizeRows", batchSizeRows)
+ .add("retryPolicyName", retryPolicyName)
+ .add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs)
+ .add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs)
.toString();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
index 886f6d3..c00bfd7 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
@@ -50,8 +50,10 @@ public class ClusterFactory extends BaseBeanFactory<Cluster> {
.withoutMetrics()
.addContactPoints(cassandraConf.getNodes())
.withPort(cassandraConf.getPort())
- .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
- .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1)))
+ .withRetryPolicy(cassandraConf.getRetryPolicy())
+ .withReconnectionPolicy(new ExponentialReconnectionPolicy(
+ cassandraConf.getReconnectionPolicyBaseMs(),
+ cassandraConf.getReconnectionPolicyMaxMs()))
.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
final String username = cassandraConf.getUsername();
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
index 8ed9293..945d0a8 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
@@ -77,14 +77,14 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
@Override
public synchronized Session connect() throws NoHostAvailableException {
if( isDisconnected() ) {
- LOG.info(String.format("Connected to cluster: %s", cluster.getClusterName()));
+ LOG.info("Connected to cluster: {}", cluster.getClusterName());
for ( Host host : getAllHosts())
- LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()));
+ LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack());
- LOG.info(String.format("Connect to cluster using keyspace %s", keyspace));
+ LOG.info("Connect to cluster using keyspace %s", keyspace);
session = cluster.connect(keyspace);
} else {
- LOG.warn(String.format("%s - Already connected to cluster: %s", getExecutorName(), cluster.getClusterName()));
+ LOG.warn("{} - Already connected to cluster: {}", getExecutorName(), cluster.getClusterName());
}
if( session.isClosed() ) {
@@ -107,7 +107,7 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
@Override
public void close( ) {
if( cluster != null && !cluster.isClosed() ) {
- LOG.info(String.format("Try to close connection to cluster: %s", cluster.getClusterName()));
+ LOG.info("Try to close connection to cluster: {}", cluster.getClusterName());
session.close();
cluster.close();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b039d331/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
index 311ed11..5366c81 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Service to asynchronously executes cassandra statements.
@@ -47,7 +48,7 @@ public class AsyncExecutor<T> implements Serializable {
protected AsyncResultHandler<T> handler;
- private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( );
+ private AtomicInteger pending = new AtomicInteger();
/**
* Creates a new {@link AsyncExecutor} instance.
@@ -73,8 +74,8 @@ public class AsyncExecutor<T> implements Serializable {
}
/**
- * Asynchronously executes all statements associated to the specified input. The input will be passed to
- * the {@link #handler} once all queries succeed or failed.
+ * Asynchronously executes all statements associated to the specified input.
+ * The input will be passed to handler#onSuccess once all queries succeed or to handler#onFailure if any one of them fails.
*/
public List<SettableFuture<T>> execAsync(List<Statement> statements, final T input) {
@@ -111,11 +112,11 @@ public class AsyncExecutor<T> implements Serializable {
*/
public SettableFuture<T> execAsync(final Statement statement, final T inputs, final AsyncResultHandler<T> handler) {
final SettableFuture<T> settableFuture = SettableFuture.create();
- pending.put(settableFuture, true);
+ pending.incrementAndGet();
ResultSetFuture future = session.executeAsync(statement);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
public void release() {
- pending.remove(settableFuture);
+ pending.decrementAndGet();
}
@Override
@@ -139,8 +140,8 @@ public class AsyncExecutor<T> implements Serializable {
/**
* Returns the number of currently executed tasks which are not yet completed.
*/
- public int getPendingExec( ) {
- return this.pending.size();
+ public int getPendingTasksSize() {
+ return this.pending.intValue();
}
public void shutdown( ) {