You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/02 00:49:23 UTC
[2/7] storm git commit: STORM-1075 add external module storm-cassandra
STORM-1075 add external module storm-cassandra
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/641300e2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/641300e2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/641300e2
Branch: refs/heads/master
Commit: 641300e2441d732fc2be98f05e70c6e5db4e4cf6
Parents: 437c4b1
Author: Florian Hussonnois <fl...@zenika.com>
Authored: Wed Oct 28 12:08:58 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Wed Nov 25 09:50:15 2015 +0100
----------------------------------------------------------------------
external/storm-cassandra/README.md | 180 +++++++++++++++++
external/storm-cassandra/pom.xml | 124 ++++++++++++
.../AbstractExecutionResultHandler.java | 60 ++++++
.../cassandra/BaseExecutionResultHandler.java | 85 ++++++++
.../storm/cassandra/CassandraContext.java | 92 +++++++++
.../cassandra/DynamicStatementBuilder.java | 199 +++++++++++++++++++
.../storm/cassandra/ExecutionResultHandler.java | 98 +++++++++
.../storm/cassandra/Murmur3StreamGrouping.java | 89 +++++++++
.../storm/cassandra/bolt/BaseCassandraBolt.java | 193 ++++++++++++++++++
.../bolt/BatchCassandraWriterBolt.java | 192 ++++++++++++++++++
.../cassandra/bolt/CassandraWriterBolt.java | 72 +++++++
.../cassandra/bolt/GroupingBatchBuilder.java | 68 +++++++
.../bolt/PairBatchStatementTuples.java | 52 +++++
.../cassandra/bolt/PairStatementTuple.java | 50 +++++
.../storm/cassandra/client/CassandraConf.java | 146 ++++++++++++++
.../storm/cassandra/client/ClusterFactory.java | 71 +++++++
.../storm/cassandra/client/SimpleClient.java | 42 ++++
.../cassandra/client/SimpleClientProvider.java | 35 ++++
.../cassandra/client/impl/DefaultClient.java | 123 ++++++++++++
.../cassandra/context/BaseBeanFactory.java | 65 ++++++
.../storm/cassandra/context/BeanFactory.java | 48 +++++
.../storm/cassandra/context/WorkerCtx.java | 89 +++++++++
.../storm/cassandra/executor/AsyncExecutor.java | 152 ++++++++++++++
.../executor/AsyncExecutorProvider.java | 40 ++++
.../cassandra/executor/AsyncResultHandler.java | 64 ++++++
.../executor/ExecutionResultCollector.java | 99 +++++++++
.../executor/impl/BatchAsyncResultHandler.java | 73 +++++++
.../executor/impl/SingleAsyncResultHandler.java | 72 +++++++
.../query/BatchStatementTupleMapper.java | 57 ++++++
.../cassandra/query/CQLClauseTupleMapper.java | 36 ++++
.../cassandra/query/CQLStatementBuilder.java | 31 +++
.../query/CQLStatementTupleMapper.java | 86 ++++++++
.../cassandra/query/CQLTableTupleMapper.java | 39 ++++
.../cassandra/query/CQLValuesTupleMapper.java | 74 +++++++
.../storm/cassandra/query/ContextQuery.java | 83 ++++++++
.../query/SimpleCQLStatementTupleMapper.java | 51 +++++
.../query/impl/BoundStatementMapperBuilder.java | 107 ++++++++++
.../query/impl/InsertStatementBuilder.java | 153 ++++++++++++++
.../query/impl/UpdateStatementBuilder.java | 118 +++++++++++
.../cassandra/query/selector/FieldSelector.java | 68 +++++++
.../cassandra/DynamicStatementBuilderTest.java | 133 +++++++++++++
.../apache/storm/cassandra/WeatherSpout.java | 84 ++++++++
.../storm/cassandra/bolt/BaseTopologyTest.java | 60 ++++++
.../bolt/BatchCassandraWriterBoltTest.java | 62 ++++++
.../cassandra/bolt/CassandraWriterBoltTest.java | 63 ++++++
.../src/test/resources/schema.cql | 7 +
pom.xml | 1 +
storm-dist/binary/src/main/assembly/binary.xml | 15 +-
48 files changed, 4000 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
new file mode 100644
index 0000000..96454b6
--- /dev/null
+++ b/external/storm-cassandra/README.md
@@ -0,0 +1,180 @@
+Storm Cassandra Integration (CQL).
+-------------------
+
+[Apache Storm](https://storm.apache.org/) is a free and open source distributed realtime computation system.
+
+### Bolt API implementation for Apache Cassandra
+
+This library provides core storm bolt on top of Apache Cassandra.
+Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.
+
+
+### Configuration
+The following properties may be passed to storm configuration.
+
+| **Property name** | **Description** | **Default** |
+| ------------------------------------- | ----------------| -------------|
+| **cassandra.keyspace** | - | |
+| **cassandra.nodes** | - | {"localhost"}|
+| **cassandra.username** | - | - |
+| **cassandra.password** | - | - |
+| **cassandra.port** | - | 9092 |
+| **cassandra.output.consistencyLevel** | - | ONE |
+| **cassandra.batch.size.rows** | - | 100 |
+
+### CassandraWriterBolt
+
+####Static import
+```java
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.*
+
+```
+
+#### Insert Query Builder
+##### Insert query including only the specified tuple fields.
+```java
+
+ new CassandraWriterBolt(
+ insertInto("album")
+ .values(
+ with(fields("title", "year", "performer", "genre", "tracks")
+ ).build());
+```
+##### Insert query including all tuple fields.
+```java
+
+ new CassandraWriterBolt(
+ insertInto("album")
+ .values(all()).build());
+```
+
+##### Insert multiple queries from one input tuple.
+```java
+
+ new CassandraWriterBolt(
+ async(
+ insertInto("titles_per_album").values(all()),
+ insertInto("titles_per_performer").values(all())
+ )
+ );
+```
+
+##### Insert query including some fields with aliases
+```java
+
+ new CassandraWriterBolt(
+ insertInto("album")
+ .values(
+ with(field("ti"),as("title",
+ field("ye").as("year")),
+ field("pe").as("performer")),
+ field("ge").as("genre")),
+ field("tr").as("tracks")),
+ ).build());
+```
+
+##### Insert query with static bound query
+```java
+
+ new CassandraWriterBolt(
+ boundQuery("INSERT INTO album (\"title\", \"year\", \"performer\", \"genre\", \"tracks\") VALUES(?, ?, ?, ?, ?);")
+ .bind(all());
+```
+
+##### Insert query with bound statement load from storm configuration
+```java
+
+ new CassandraWriterBolt(
+ boundQuery(named("insertIntoAlbum"))
+ .bind(all());
+```
+
+##### Insert query with bound statement load from tuple field
+```java
+
+ new CassandraWriterBolt(
+ boundQuery(namedByField("cql"))
+ .bind(all());
+```
+
+##### Insert query with batch statement
+```java
+
+ // Logged
+ new CassandraWriterBolt(loggedBatch(
+ insertInto("title_per_album").values(all())
+ insertInto("title_per_performer").values(all())
+ )
+ );
+// UnLogged
+ new CassandraWriterBolt(unLoggedBatch(
+ insertInto("title_per_album").values(all())
+ insertInto("title_per_performer").values(all())
+ )
+ );
+```
+
+### How to handle query execution results
+
+The interface *ExecutionResultHandler* can be used to custom how an execution result should be handle.
+
+```java
+public interface ExecutionResultHandler extends Serializable {
+ void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
+
+ void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
+
+ void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
+
+ void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
+
+ void onQuerySuccess(OutputCollector collector, Tuple tuple);
+}
+```
+
+By default, the CassandraBolt fails a tuple on all Cassandra Exception (see [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)) .
+
+```java
+ new CassandraWriterBolt(insertInto("album").values(with(all()).build())
+ .withResultHandler(new MyCustomResultHandler());
+```
+
+### Declare Output fields
+
+A CassandraBolt can declare output fields / stream output fields.
+For instance, this may be used to remit a new tuple on error, or to chain queries.
+
+```java
+ new CassandraWriterBolt(insertInto("album").values(withFields(all()).build())
+ .withResultHandler(new EmitOnDriverExceptionResultHandler());
+ .withStreamOutputFields("stream_error", new Fields("message");
+
+ public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler {
+ @Override
+ protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
+ LOG.error("An error occurred while executing cassandra statement", e);
+ collector.emit("stream_error", new Values(e.getMessage()));
+ collector.ack(tuple);
+ }
+ }
+```
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
new file mode 100644
index 0000000..86cd87d
--- /dev/null
+++ b/external/storm-cassandra/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <packaging>jar</packaging>
+ <artifactId>storm-cassandra</artifactId>
+ <name>storm-cassandra</name>
+ <description>Storm Bolts for Apache Cassandra</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <java.version>1.7</java.version>
+ <org.slf4j.version>1.7.6</org.slf4j.version>
+ <jackson.databind.version>2.3.2</jackson.databind.version>
+ <junit.version>4.11</junit.version>
+ <guava.version>16.0.1</guava.version>
+ <commons-lang3.version>3.3</commons-lang3.version>
+ <cassandra.driver.core.version>2.1.7.1</cassandra.driver.core.version>
+ </properties>
+
+ <developers>
+ <developer>
+ <id>fhuss</id>
+ <name>Florian Hussonnois</name>
+ <email>florian.hussonnois@gmail.com</email>
+ <url>https://github.com/fhussonnois</url>
+ <roles>
+ <role>developer</role>
+ </roles>
+ </developer>
+ </developers>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${cassandra.driver.core.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${org.slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.10.19</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.cassandraunit</groupId>
+ <artifactId>cassandra-unit</artifactId>
+ <version>2.1.3.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
new file mode 100644
index 0000000..80ae284
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import com.datastax.driver.core.exceptions.WriteTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Default interface to define strategies to apply when a query is either succeed or failed.
+ *
+ */
+public abstract class AbstractExecutionResultHandler implements ExecutionResultHandler {
+
+ public static final Logger LOG = LoggerFactory.getLogger(AbstractExecutionResultHandler.class);
+
+ @Override
+ public void onThrowable(Throwable t, OutputCollector collector, Tuple i) {
+ if( t instanceof QueryValidationException) {
+ this.onQueryValidationException((QueryValidationException)t, collector, i);
+ } else if (t instanceof ReadTimeoutException) {
+ this.onReadTimeoutException((ReadTimeoutException)t, collector, i);
+ } else if (t instanceof WriteTimeoutException) {
+ this.onWriteTimeoutException((WriteTimeoutException) t, collector, i);
+ } else if (t instanceof UnavailableException) {
+ this.onUnavailableException((UnavailableException) t, collector, i);
+ } else {
+ collector.reportError(t);
+ collector.fail(i);
+ }
+ }
+
+ @Override
+ public void onThrowable(Throwable t, OutputCollector collector, List<Tuple> tl) {
+ for(Tuple i : tl) onThrowable(t, collector, i);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
new file mode 100644
index 0000000..c7fc4f1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.exceptions.*;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link ExecutionResultHandler} which fail the incoming tuple when an {@link com.datastax.driver.core.exceptions.DriverException} is thrown.
+ * The exception is then automatically report to storm.
+ *
+ */
+public class BaseExecutionResultHandler extends AbstractExecutionResultHandler {
+
+ private final static org.slf4j.Logger LOG = LoggerFactory.getLogger(BaseExecutionResultHandler.class);
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple) {
+ onDriverException(e, collector, tuple);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple) {
+ onDriverException(e, collector, tuple);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple) {
+ onDriverException(e, collector, tuple);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple) {
+ onDriverException(e, collector, tuple);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onQuerySuccess(OutputCollector collector, Tuple tuple) {
+
+ }
+
+ /**
+ * This method is called when an one of the methods of the {@link BaseExecutionResultHandler} is not
+ * overridden. It can be practical if you want to bundle some/all of the methods to a single method.
+ *
+ * @param e the exception throws
+ * @param collector the output collector
+ * @param tuple the tuple in failure
+ */
+ protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) {
+ LOG.error("An error occurred while executing cassandra statement", e);
+ collector.fail(tuple);
+ collector.reportError(e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
new file mode 100644
index 0000000..3081b0d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.storm.cassandra.client.CassandraConf;
+import org.apache.storm.cassandra.client.ClusterFactory;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.client.impl.DefaultClient;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+import org.apache.storm.cassandra.context.WorkerCtx;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraContext extends WorkerCtx implements SimpleClientProvider {
+
+ /**
+ * Creates a new {@link CassandraContext} instance.
+ */
+ public CassandraContext() {
+ register(SimpleClient.class, new ClientFactory());
+ register(CassandraConf.class, new CassandraConfFactory());
+ register(Cluster.class, new ClusterFactory());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SimpleClient getClient(Map<String, Object> config) {
+ SimpleClient client = getWorkerBean(SimpleClient.class, config);
+ if (client.isClose() )
+ client = getWorkerBean(SimpleClient.class, config, true);
+ return client;
+ }
+
+ /**
+ * Simple class to make {@link CassandraConf} from a Storm topology configuration.
+ */
+ public static final class CassandraConfFactory extends BaseBeanFactory<CassandraConf> {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected CassandraConf make(Map<String, Object> stormConf) {
+ return new CassandraConf(stormConf);
+ }
+ }
+
+ /**
+ * Simple class to make {@link ClientFactory} from a Storm topology configuration.
+ */
+ public static final class ClientFactory extends BaseBeanFactory<SimpleClient> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClientFactory.class);
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected SimpleClient make(Map<String, Object> stormConf) {
+ Cluster cluster = this.context.getWorkerBean(Cluster.class, stormConf);
+ if( cluster.isClosed() ) {
+ LOG.warn("Cluster is closed - trigger new initialization!");
+ cluster = this.context.getWorkerBean(Cluster.class, stormConf, true);
+ }
+ CassandraConf config = this.context.getWorkerBean(CassandraConf.class, stormConf);
+ return new DefaultClient(cluster, config.getKeyspace());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
new file mode 100644
index 0000000..deea8da
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.BoundStatementMapperBuilder;
+import org.apache.storm.cassandra.query.impl.InsertStatementBuilder;
+import org.apache.storm.cassandra.query.impl.UpdateStatementBuilder;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class DynamicStatementBuilder implements Serializable {
+
+ private DynamicStatementBuilder() {
+ }
+
+ /**
+ * Builds a new insert statement for the specified table.
+ *
+ * @param table the table's name.
+ * @return a new {@link InsertStatementBuilder} instance.
+ */
+ public static final InsertStatementBuilder insertInto(String table) {
+ return new InsertStatementBuilder(table);
+ }
+ /**
+ * Builds a new insert statement based on the specified CQL mapper.
+ *
+ * @param mapper the CQL mapper.
+ * @return a new {@link InsertStatementBuilder} instance.
+ */
+ public static final InsertStatementBuilder insertInto(CQLTableTupleMapper mapper) {
+ return new InsertStatementBuilder(mapper);
+ }
+ /**
+ * Builds a new insert statement for the specified keyspace and table.
+ *
+ * @param ks the keyspace to use.
+ * @param table the table's name.
+ * @return a new {@link InsertStatementBuilder} instance.
+ */
+ public static final InsertStatementBuilder insertInto(String ks, String table) {
+ return new InsertStatementBuilder(table, ks);
+ }
+
+ /**
+ * Builds a new update statement for the specified table.
+ *
+ * @param table the table's name.
+ * @return a new {@link UpdateStatementBuilder} instance.
+ */
+ public static final UpdateStatementBuilder update(String table) {
+ return new UpdateStatementBuilder(table);
+ }
+
+ /**
+ * Builds a new update statement for the specified keyspace and table.
+ *
+ * @param table the table's name.
+ * @return a new {@link UpdateStatementBuilder} instance.
+ */
+ public static final UpdateStatementBuilder update(String ks, String table) {
+ return new UpdateStatementBuilder(table, ks);
+ }
+
+ /**
+ * Builds a new bound statement based on the specified query.
+ *
+ * @param cql the query.
+ * @return a new {@link BoundStatementMapperBuilder} instance.
+ */
+ public static final BoundStatementMapperBuilder boundQuery(String cql) {
+ return new BoundStatementMapperBuilder(cql);
+ }
+
+ /**
+ * Builds a new bound statement identified by the given field.
+ *
+ * @param field a context used to resolve the cassandra query.
+ * @return a new {@link BoundStatementMapperBuilder} instance.
+ */
+ public static final BoundStatementMapperBuilder boundQuery(ContextQuery field) {
+ return new BoundStatementMapperBuilder(field);
+ }
+
+ /**
+ * Builds multiple statements which will be executed asynchronously.
+ *
+ * @param builders a list of {@link CQLStatementBuilder}.
+ * @return a new {@link CQLStatementTupleMapper}.
+ */
+ public static final CQLStatementTupleMapper async(final CQLStatementBuilder... builders) {
+ return new CQLStatementTupleMapper.DynamicCQLStatementTupleMapper(Arrays.asList(builders));
+ }
+
+ /**
+ * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#LOGGED} batch statement for the specified CQL statement builders.
+ */
+ public static final BatchStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
+ return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders);
+ }
+ /**
+ * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders.
+ */
+ public static final BatchStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
+ return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders);
+ }
+ /**
+ * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders.
+ */
+ public static final BatchStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
+ return newBatchStatementBuilder(BatchStatement.Type.UNLOGGED, builders);
+ }
+
+ private static BatchStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
+ List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length);
+ for(CQLStatementBuilder b : Arrays.asList(builders))
+ mappers.add(b.build());
+ return new BatchStatementTupleMapper(type, mappers);
+ }
+
+ /**
+ * Retrieves from the storm configuration the specified named query.
+ *
+ * @param name query's name.
+ */
+ public static final ContextQuery named(final String name) {
+ return new ContextQuery.BoundQueryContext(name);
+ }
+
+ /**
+ * Retrieves from the storm configuration the named query specified by a tuple field.
+ *
+ * @param fieldName field's name that contains the named of the query.
+ */
+ public static final ContextQuery namedByField(final String fieldName) {
+ return new ContextQuery.BoundQueryNamedByFieldContext(fieldName);
+ }
+
+
+ /**
+ * Maps a CQL value to the specified field from an input tuple.
+ *
+ * @param name the name of a tuple field.
+ * @return a new {@link FieldSelector}.
+ */
+ public static final FieldSelector field(final String name) {
+ return new FieldSelector(name);
+ }
+
+ /**
+ * Maps CQL values to all specified fields from an input tuple.
+ *
+ * @param fields a list of tuple fields
+ * @return a list of {@link FieldSelector}.
+ */
+ public static final FieldSelector[] fields(final String... fields) {
+ int size = fields.length;
+ List<FieldSelector> fl = new ArrayList<>(size);
+ for(int i = 0 ; i < size; i++)
+ fl.add(new FieldSelector(fields[i]));
+ return fl.toArray(new FieldSelector[size]);
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param fields a list of field selector.
+ */
+ public static final CQLValuesTupleMapper with(final FieldSelector... fields) {
+ return new CQLValuesTupleMapper.WithFieldTupleMapper(Arrays.asList(fields));
+ }
+
+ /**
+ * Includes all tuple fields.
+ */
+ public static final CQLValuesTupleMapper all() {
+ return new CQLValuesTupleMapper.AllTupleMapper();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
new file mode 100644
index 0000000..b804ee5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import com.datastax.driver.core.exceptions.WriteTimeoutException;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Default interface to define strategies to apply when a query is either succeed or failed.
+ *
+ */
+public interface ExecutionResultHandler extends Serializable {
+
+ /**
+ * Invoked when a {@link com.datastax.driver.core.exceptions.QueryValidationException} is thrown.
+ *
+ * @param e the cassandra exception.
+ * @param collector the storm collector.
+ * @param tuple an input tuple.
+ */
+ void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple);
+ /**
+ * Invoked when a {@link com.datastax.driver.core.exceptions.ReadTimeoutException} is thrown.
+ *
+ * @param e the cassandra exception.
+ * @param collector the storm collector.
+ * @param tuple an input tuple.
+ */
+ void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple);
+
+ /**
+ * Invoked when a {@link com.datastax.driver.core.exceptions.WriteTimeoutException} is thrown.
+ *
+ * @param e the cassandra exception.
+ * @param collector the storm collector.
+ * @param tuple an input tuple.
+ */
+ void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple);
+ /**
+ * Invoked when a {@link com.datastax.driver.core.exceptions.UnavailableException} is thrown.
+ *
+ *
+ * @param e the cassandra exception.
+ * @param collector the storm collector.
+ * @param tuple an input tuple.
+ */
+ void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple);
+ /**
+ * Invoked when a query is executed with success.
+ * This method is NOT responsible for acknowledging input tuple.
+ *
+ * @param collector the storm collector.
+ * @param tuple an input tuple.
+ */
+ void onQuerySuccess(OutputCollector collector, Tuple tuple);
+
+ /**
+ * Default method used to handle any type of exception.
+ *
+ * @param t the thrown exception
+ * @param collector the storm collector.
+ * @param i an input tuple.
+ */
+ void onThrowable(Throwable t, OutputCollector collector, Tuple i);
+
+ /**
+ * Default method used to handle any type of exception.
+ *
+ * @param t the thrown exception
+ * @param collector the storm collector.
+ * @param tl a list of input tuple.
+ */
+ void onThrowable(Throwable t, OutputCollector collector, List<Tuple> tl);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
new file mode 100644
index 0000000..a3f6887
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.topology.FailedException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
+ *
+ * This stream grouping may be used to optimise writes to Apache Cassandra.
+ */
+public class Murmur3StreamGrouping implements CustomStreamGrouping {
+
+ private List<Integer> targetTasks;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = targetTasks;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ try {
+ int n = Math.abs( (int) hashes(values) % targetTasks.size() );
+ return Lists.newArrayList(targetTasks.get(n));
+ } catch (IOException e) {
+ throw new FailedException(e);
+ }
+ }
+
+ /**
+ * Computes the murmur3 hash for the specified values.
+ * http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys
+ * https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+ *
+ * @param values the fields which are part of the (compose) partition key.
+ * @return the computed hash for input values.
+ * @throws java.io.IOException
+ */
+ @VisibleForTesting
+ public static long hashes(List<Object> values) throws IOException {
+ byte[] keyBytes;
+ try(ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(bos)) {
+ for(Object key : values) {
+ byte[] arr = ((String)key).getBytes("UTF-8");
+ out.writeShort(arr.length);
+ out.write(arr, 0, arr.length);
+ out.writeByte(0);
+ }
+ out.flush();
+ keyBytes = bos.toByteArray();
+ }
+ return Hashing.murmur3_128().hashBytes(keyBytes).asLong();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
new file mode 100644
index 0000000..7211ad3
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import org.apache.storm.cassandra.BaseExecutionResultHandler;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.ExecutionResultHandler;
+import org.apache.storm.cassandra.client.CassandraConf;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.executor.AsyncExecutor;
+import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A base cassandra bolt.
+ *
+ * Default {@link backtype.storm.topology.base.BaseRichBolt}
+ */
+public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseCassandraBolt.class);
+
+ protected OutputCollector outputCollector;
+
+ protected SimpleClientProvider clientProvider;
+ protected SimpleClient client;
+ protected Session session;
+ protected Map stormConfig;
+
+ protected CassandraConf cassandraConfConfig;
+
+ private CQLStatementTupleMapper mapper;
+ private ExecutionResultHandler resultHandler;
+
+ transient private Map<String, Fields> outputsFields = new HashMap<>();
+
+ /**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ * @param mapper
+ */
+ public BaseCassandraBolt(CQLStatementTupleMapper mapper, SimpleClientProvider clientProvider) {
+ this.mapper = mapper;
+ this.clientProvider = clientProvider;
+ }
+ /**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ * @param tupleMapper
+ */
+ public BaseCassandraBolt(CQLStatementTupleMapper tupleMapper) {
+ this(tupleMapper, new CassandraContext());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
+ this.outputCollector = outputCollector;
+ this.stormConfig = stormConfig;
+ this.cassandraConfConfig = new CassandraConf(stormConfig);
+ this.client = clientProvider.getClient(this.stormConfig);
+ try {
+ session = client.connect();
+ } catch (NoHostAvailableException e) {
+ outputCollector.reportError(e);
+ }
+ }
+
+ public BaseCassandraBolt withResultHandler(ExecutionResultHandler resultHandler) {
+ this.resultHandler = resultHandler;
+ return this;
+ }
+
+ public BaseCassandraBolt withOutputFields(Fields fields) {
+ this.outputsFields.put(null, fields);
+ return this;
+ }
+
+ public BaseCassandraBolt withStreamOutputFields(String stream, Fields fields) {
+ if( stream == null || stream.length() == 0) throw new IllegalArgumentException("'stream' should not be null");
+ this.outputsFields.put(stream, fields);
+ return this;
+ }
+
+ protected ExecutionResultHandler getResultHandler() {
+ if(resultHandler == null) resultHandler = new BaseExecutionResultHandler();
+ return resultHandler;
+ }
+
+ protected CQLStatementTupleMapper getMapper() {
+ return mapper;
+ }
+
+ abstract protected AsyncResultHandler<T> getAsyncHandler() ;
+
+ protected AsyncExecutor<T> getAsyncExecutor() {
+ return AsyncExecutorProvider.getLocal(session, getAsyncHandler());
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param input the tuple to process.
+ */
+ @Override
+ public final void execute(Tuple input) {
+ getAsyncHandler().flush(outputCollector);
+ if (TupleUtils.isTick(input)) {
+ tick();
+ outputCollector.ack(input);
+ } else {
+ process(input);
+ }
+ }
+
+ /**
+ * Process a single tuple of input.
+ *
+ * @param input The input tuple to be processed.
+ */
+ abstract protected void process(Tuple input);
+
+ /**
+ * Calls by an input tick tuple.
+ */
+ abstract protected void tick();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ Fields fields = this.outputsFields.remove(null);
+ if( fields != null) declarer.declare(fields);
+ for(Map.Entry<String, Fields> entry : this.outputsFields.entrySet()) {
+ declarer.declareStream(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ // add tick tuple each second to force acknowledgement of pending tuples.
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
+ return conf;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void cleanup() {
+ getAsyncExecutor().shutdown();
+ getAsyncHandler().flush(outputCollector);
+ client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
new file mode 100644
index 0000000..c4c0110
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Time;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
+
+ private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
+
+ public static final int DEFAULT_EMIT_FREQUENCY = 2;
+
+ private static final int QUEUE_MAX_SIZE = 1000;
+
+ private LinkedBlockingQueue<Tuple> queue;
+
+ private int tickFrequencyInSeconds;
+
+ private long lastModifiedTimesMillis;
+
+ private String componentID;
+
+ private AsyncResultHandler<List<Tuple>> asyncResultHandler;
+
+ /**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+ public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+ this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
+ }
+
+ /**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+ public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
+ super(tupleMapper);
+ this.tickFrequencyInSeconds = tickFrequencyInSeconds;
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
+ super.prepare(stormConfig, topologyContext, outputCollector);
+ this.componentID = topologyContext.getThisComponentId();
+ this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
+ this.lastModifiedTimesMillis = now();
+ }
+
+ @Override
+ protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
+ if( asyncResultHandler == null) {
+ asyncResultHandler = new BatchAsyncResultHandler(getResultHandler());
+ }
+ return asyncResultHandler;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void process(Tuple input) {
+ if( ! queue.offer(input) ) {
+ LOG.info(logPrefix() + "The message queue is full, preparing batch statement...");
+ prepareAndExecuteStatement();
+ queue.add(input);
+ }
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void tick() {
+ prepareAndExecuteStatement();
+ }
+
+ public void prepareAndExecuteStatement() {
+ int size = queue.size();
+ if( size > 0 ) {
+ List<Tuple> inputs = new ArrayList<>(size);
+ queue.drainTo(inputs);
+ try {
+ List<PairStatementTuple> psl = buildStatement(inputs);
+
+ int sinceLastModified = updateAndGetSecondsSinceLastModified();
+ LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified));
+
+ checkTimeElapsedSinceLastExec(sinceLastModified);
+
+ GroupingBatchBuilder batchBuilder = new GroupingBatchBuilder(cassandraConfConfig.getBatchSizeRows(), psl);
+
+ int batchSize = 0;
+ for (PairBatchStatementTuples batch : batchBuilder) {
+ LOG.debug(logPrefix() + String.format("Writing data to %s in batches of %s rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size()));
+ getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs());
+ batchSize++;
+ }
+
+ int pending = getAsyncExecutor().getPendingExec();
+ if (pending > batchSize) {
+ LOG.warn( logPrefix() + String.format("Currently pending tasks is superior to the number of submit batches(%s) : %s", batchSize, pending));
+ }
+
+ } catch (Throwable r) {
+ LOG.error(logPrefix() + "Error(s) occurred while preparing batch statements");
+ getAsyncHandler().failure(r, inputs);
+ }
+ }
+ }
+
+ private List<PairStatementTuple> buildStatement(List<Tuple> inputs) {
+ List<PairStatementTuple> stmts = new ArrayList<>(inputs.size());
+
+ for(Tuple t : inputs) {
+ List<Statement> sl = getMapper().map(stormConfig, session, t);
+ for(Statement s : sl)
+ stmts.add(new PairStatementTuple(t, s) );
+ }
+ return stmts;
+ }
+
+ private void checkTimeElapsedSinceLastExec(int sinceLastModified) {
+ if(sinceLastModified > tickFrequencyInSeconds)
+ LOG.warn( logPrefix() + String.format("The time elapsed since last execution exceeded tick tuple frequency - %d > %d seconds", sinceLastModified, tickFrequencyInSeconds));
+ }
+
+ private String logPrefix() {
+ return componentID + " - ";
+ }
+
+ public BatchCassandraWriterBolt withTickFrequency(long time, TimeUnit unit) {
+ this.tickFrequencyInSeconds = (int)unit.toSeconds(time);
+ return this;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
+ return conf;
+ }
+
+ private int updateAndGetSecondsSinceLastModified() {
+ long now = now();
+ int seconds = (int) (now - lastModifiedTimesMillis) / 1000;
+ lastModifiedTimesMillis = now;
+ return seconds;
+ }
+
+ private long now() {
+ return Time.currentTimeMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
new file mode 100644
index 0000000..663f26a
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.SingleAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+
+import java.util.List;
+
+public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> {
+
+ private AsyncResultHandler<Tuple> asyncResultHandler;
+
+ /**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+ public CassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+ super(tupleMapper);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected AsyncResultHandler<Tuple> getAsyncHandler() {
+ if( asyncResultHandler == null) {
+ asyncResultHandler = new SingleAsyncResultHandler(getResultHandler());
+ }
+ return asyncResultHandler;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void process(Tuple input) {
+ List<Statement> statements = getMapper().map(stormConfig, session, input);
+ if (statements.size() == 1) getAsyncExecutor().execAsync(statements.get(0), input);
+ else getAsyncExecutor().execAsync(statements, input);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void tick() {
+ /** do nothing **/
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
new file mode 100644
index 0000000..ea63b3d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.BatchStatement;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *
+ */
+public class GroupingBatchBuilder implements Iterable<PairBatchStatementTuples> {
+
+ private int batchSizeRows;
+
+ private List<PairStatementTuple> statements;
+
+ /**
+ * Creates a new {@link GroupingBatchBuilder} instance.
+ * @param batchSizeRows
+ */
+ public GroupingBatchBuilder(int batchSizeRows, List<PairStatementTuple> statements) {
+ this.batchSizeRows = batchSizeRows;
+ this.statements = statements;
+ }
+
+ @Override
+ public Iterator<PairBatchStatementTuples> iterator() {
+ return build().iterator();
+ }
+
+ private Iterable<PairBatchStatementTuples> build( ) {
+ Iterable<List<PairStatementTuple>> partition = Iterables.partition(statements, batchSizeRows);
+ return Iterables.transform(partition, new Function<List<PairStatementTuple>, PairBatchStatementTuples>() {
+ @Override
+ public PairBatchStatementTuples apply(List<PairStatementTuple> l) {
+ final List<Tuple> inputs = new LinkedList<>();
+ final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+ for (PairStatementTuple pair : l) {
+ batch.add(pair.getStatement());
+ inputs.add(pair.getTuple());
+ }
+ return new PairBatchStatementTuples(inputs, batch);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
new file mode 100644
index 0000000..736c482
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.BatchStatement;
+
+import java.util.List;
+
+/**
+ * Simple class to pair a list of tuples with a single batch statement.
+ */
+public class PairBatchStatementTuples {
+
+ private final List<Tuple> inputs;
+
+ private final BatchStatement statement;
+
+ /**
+ * Creates a new {@link PairBatchStatementTuples} instance.
+ * @param inputs List of inputs attached to this batch.
+ * @param statement The batch statement.
+ */
+ public PairBatchStatementTuples(List<Tuple> inputs, BatchStatement statement) {
+ this.inputs = inputs;
+ this.statement = statement;
+ }
+
+ public List<Tuple> getInputs() {
+ return inputs;
+ }
+
+ public BatchStatement getStatement() {
+ return statement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
new file mode 100644
index 0000000..8f50574
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.tuple.Tuple;
+import com.datastax.driver.core.Statement;
+
+/**
+ * Simple class to pair a tuple with a statement.
+ */
+public class PairStatementTuple {
+
+ private final Tuple tuple;
+
+ private final Statement statement;
+
+ /**
+ * Creates a new {@link PairStatementTuple} instance.
+ * @param tuple
+ * @param statement
+ */
+ public PairStatementTuple(Tuple tuple, Statement statement) {
+ this.tuple = tuple;
+ this.statement = statement;
+ }
+
+ public Tuple getTuple() {
+ return tuple;
+ }
+
+ public Statement getStatement() {
+ return statement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
new file mode 100644
index 0000000..ccee468
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Configuration used by cassandra storm components.
+ */
+public class CassandraConf implements Serializable {
+
+ public static final String CASSANDRA_USERNAME = "cassandra.username";
+ public static final String CASSANDRA_PASSWORD = "cassandra.password";
+ public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace";
+ public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.output.consistencyLevel";
+ public static final String CASSANDRA_NODES = "cassandra.nodes";
+ public static final String CASSANDRA_PORT = "cassandra.port";
+ public static final String CASSANDRA_BATCH_SIZE_ROWS = "cassandra.batch.size.rows";
+
+ /**
+ * The authorized cassandra username.
+ */
+ private String username;
+ /**
+ * The authorized cassandra password
+ */
+ private String password;
+ /**
+ * The cassandra keyspace.
+ */
+ private String keyspace;
+ /**
+ * List of contacts nodes.
+ */
+ private String[] nodes = {"localhost"};
+
+ /**
+ * The port used to connect to nodes.
+ */
+ private int port = 9092;
+
+ /**
+ * Consistency level used to write statements.
+ */
+ private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+ /**
+ * The maximal numbers of rows per batch.
+ */
+ private int batchSizeRows = 100;
+
+ /**
+ * Creates a new {@link CassandraConf} instance.
+ */
+ public CassandraConf() {
+ super();
+ }
+
+ /**
+ * Creates a new {@link CassandraConf} instance.
+ *
+ * @param conf The storm configuration.
+ */
+ public CassandraConf(Map<String, Object> conf) {
+ this.username = getOrElse(conf, CASSANDRA_USERNAME, null);
+ this.password = getOrElse(conf, CASSANDRA_PASSWORD, null);
+ this.keyspace = get(conf, CASSANDRA_KEYSPACE);
+ this.consistencyLevel = ConsistencyLevel.valueOf(getOrElse(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
+ this.nodes = getOrElse(conf, CASSANDRA_NODES, "localhost").split(",");
+ this.batchSizeRows = getOrElse(conf, CASSANDRA_BATCH_SIZE_ROWS, 100);
+ this.port = conf.get(CASSANDRA_PORT) != null ? Integer.valueOf((String)conf.get(CASSANDRA_PORT)) : 9042;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public String[] getNodes() {
+ return nodes;
+ }
+
+ public ConsistencyLevel getConsistencyLevel() {
+ return consistencyLevel;
+ }
+
+ public int getBatchSizeRows() {
+ return batchSizeRows;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ private <T> T get(Map<String, Object> conf, String key) {
+ Object o = conf.get(key);
+ if(o == null) {
+ throw new IllegalArgumentException("No '" + key + "' value found in configuration!");
+ }
+ return (T)o;
+ }
+
+ private <T> T getOrElse(Map<String, Object> conf, String key, T def) {
+ T o = (T) conf.get(key);
+ return (o == null) ? def : o;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("username", username)
+ .add("password", password)
+ .add("keyspace", keyspace)
+ .add("nodes", nodes)
+ .add("port", port)
+ .add("consistencyLevel", consistencyLevel)
+ .add("batchSizeRows", batchSizeRows)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
new file mode 100644
index 0000000..886f6d3
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default interface to build cassandra Cluster from the a Storm Topology configuration.
+ */
+public class ClusterFactory extends BaseBeanFactory<Cluster> {
+
+ /**
+ * Creates a new Cluster based on the specified configuration.
+ * @param stormConf the storm configuration.
+ * @return a new a new {@link com.datastax.driver.core.Cluster} instance.
+ */
+ @Override
+ protected Cluster make(Map<String, Object> stormConf) {
+ CassandraConf cassandraConf = new CassandraConf(stormConf);
+
+ Cluster.Builder cluster = Cluster.builder()
+ .withoutJMXReporting()
+ .withoutMetrics()
+ .addContactPoints(cassandraConf.getNodes())
+ .withPort(cassandraConf.getPort())
+ .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
+ .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1)))
+ .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
+
+ final String username = cassandraConf.getUsername();
+ final String password = cassandraConf.getPassword();
+
+ if( StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+ cluster.withAuthProvider(new PlainTextAuthProvider(username, password));
+ }
+
+ QueryOptions options = new QueryOptions()
+ .setConsistencyLevel(cassandraConf.getConsistencyLevel());
+ cluster.withQueryOptions(options);
+
+
+ return cluster.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
new file mode 100644
index 0000000..3bd80ed
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Session;
+
+public interface SimpleClient {
+
+ /**
+ * Creates a new session on this cluster.
+ * *
+ * @return a new session on this cluster.
+ * @throws com.datastax.driver.core.exceptions.NoHostAvailableException if we cannot reach any cassandra contact points.
+ */
+ Session connect();
+
+ /**
+ * Close the underlying {@link com.datastax.driver.core.Cluster} instance.
+ */
+ void close();
+
+ /**
+ * Checks whether the underlying cluster instance is closed.
+ */
+ boolean isClose();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
new file mode 100644
index 0000000..412cb70
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import java.util.Map;
+
+/**
+ * Default interface to provide cassandra client.
+ */
+public interface SimpleClientProvider {
+
+ /**
+ * Creates a new cassandra client based on the specified storm configuration.
+ *
+ * @param config The configuration passed to the storm topology.
+ * @return a new {@link SimpleClient} instance.
+ */
+ SimpleClient getClient(Map<String, Object> config);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
new file mode 100644
index 0000000..8ed9293
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance.
+ */
+public class DefaultClient implements SimpleClient, Closeable, Serializable {
+
+ private final static Logger LOG = LoggerFactory.getLogger(DefaultClient.class);
+
+ private String keyspace;
+
+ private Cluster cluster;
+
+ private Session session;
+
+ /**
+ * Create a new {@link DefaultClient} instance.
+ *
+ * @param cluster a cassandra cluster client.
+ */
+ public DefaultClient(Cluster cluster, String keyspace) {
+ Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
+ this.cluster = cluster;
+ this.keyspace = keyspace;
+
+ }
+
+ public Set<Host> getAllHosts() {
+ Metadata metadata = getMetadata();
+ return metadata.getAllHosts();
+ }
+
+ public Metadata getMetadata() {
+ return cluster.getMetadata();
+ }
+
+
+ private String getExecutorName() {
+ Thread thread = Thread.currentThread();
+ return thread.getName();
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized Session connect() throws NoHostAvailableException {
+ if( isDisconnected() ) {
+ LOG.info(String.format("Connected to cluster: %s", cluster.getClusterName()));
+ for ( Host host : getAllHosts())
+ LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()));
+
+ LOG.info(String.format("Connect to cluster using keyspace %s", keyspace));
+ session = cluster.connect(keyspace);
+ } else {
+ LOG.warn(String.format("%s - Already connected to cluster: %s", getExecutorName(), cluster.getClusterName()));
+ }
+
+ if( session.isClosed() ) {
+ LOG.warn("Session has been closed - create new one!");
+ this.session = cluster.newSession();
+ }
+ return session;
+ }
+
+ /**
+ * Checks whether the client is already connected to the cluster.
+ */
+ protected boolean isDisconnected() {
+ return session == null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close( ) {
+ if( cluster != null && !cluster.isClosed() ) {
+ LOG.info(String.format("Try to close connection to cluster: %s", cluster.getClusterName()));
+ session.close();
+ cluster.close();
+ }
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isClose() {
+ return this.cluster.isClosed();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/641300e2/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java
new file mode 100644
index 0000000..1ce6bac
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.context;
+
+import java.util.Map;
+
+/**
+ * Base BeanProvider implementation.
+ */
+public abstract class BaseBeanFactory<T> implements BeanFactory<T> {
+
+ protected WorkerCtx context;
+
+ protected volatile T instance;
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setStormContext(WorkerCtx context) {
+ this.context = context;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized T get(Map<String, Object> stormConf) {
+ if( instance != null) return instance;
+ return instance = make(stormConf);
+ }
+ /**
+ * Return a new instance of T.
+ */
+ protected abstract T make(final Map<String, Object> stormConf);
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BeanFactory<T> newInstance() {
+ Class<? extends BaseBeanFactory> clazz = this.getClass();
+ try {
+ BaseBeanFactory factory = clazz.newInstance();
+ factory.setStormContext(this.context);
+ return factory;
+ } catch (IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException("Cannot create a new instance of " + clazz.getSimpleName(), e);
+ }
+ }
+}
\ No newline at end of file