You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 00:45:56 UTC
[2/4] storm git commit: STORM-1369: Add MapState implementation to
storm-cassandra.
STORM-1369: Add MapState implementation to storm-cassandra.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eb6c3089
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eb6c3089
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eb6c3089
Branch: refs/heads/master
Commit: eb6c308925b2a4cd89031a2a6b951976deb6262f
Parents: b7fe860
Author: Magnus Koch <ma...@ef.com>
Authored: Mon Mar 6 11:39:34 2017 +0100
Committer: Magnus Koch <ma...@ef.com>
Committed: Mon Mar 6 11:39:34 2017 +0100
----------------------------------------------------------------------
external/storm-cassandra/README.md | 159 +++++++++---
external/storm-cassandra/pom.xml | 9 +
.../storm/cassandra/executor/AsyncExecutor.java | 167 ++++++++++++-
.../executor/AsyncExecutorProvider.java | 2 +-
.../executor/AsyncResultSetHandler.java | 58 +++++
.../query/AyncCQLResultSetValuesMapper.java | 36 +++
.../trident/state/CassandraBackingMap.java | 241 +++++++++++++++++++
.../trident/state/CassandraMapStateFactory.java | 106 ++++++++
.../trident/state/MapStateFactoryBuilder.java | 226 +++++++++++++++++
.../state/NonTransactionalTupleStateMapper.java | 64 +++++
.../trident/state/OpaqueTupleStateMapper.java | 127 ++++++++++
.../trident/state/SerializedStateMapper.java | 67 ++++++
.../trident/state/SimpleStateMapper.java | 104 ++++++++
.../cassandra/trident/state/SimpleTuple.java | 213 ++++++++++++++++
.../cassandra/trident/state/StateMapper.java | 35 +++
.../state/TransactionalTupleStateMapper.java | 105 ++++++++
.../TridentAyncCQLResultSetValuesMapper.java | 117 +++++++++
.../testtools/EmbeddedCassandraResource.java | 193 +++++++++++++++
.../storm/cassandra/trident/MapStateTest.java | 230 ++++++++++++++++++
.../src/test/resources/cassandra.yaml | 39 +++
20 files changed, 2261 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 1edf708..9f543c9 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -3,13 +3,13 @@ Storm Cassandra Integration (CQL).
[Apache Storm](https://storm.apache.org/) is a free and open source distributed realtime computation system.
-### Bolt API implementation for Apache Cassandra
+## Bolt API implementation for Apache Cassandra
This library provides core storm bolt on top of Apache Cassandra.
Provides simple DSL to map storm *Tuple* to Cassandra Query Language *Statement*.
-### Configuration
+## Configuration
The following properties may be passed to storm configuration.
| **Property name** | **Description** | **Default** |
@@ -25,17 +25,17 @@ The following properties may be passed to storm configuration.
| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) |
| **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) |
-### CassandraWriterBolt
+## CassandraWriterBolt
-####Static import
+###Static import
```java
import static org.apache.storm.cassandra.DynamicStatementBuilder.*
```
-#### Insert Query Builder
-##### Insert query including only the specified tuple fields.
+### Insert Query Builder
+#### Insert query including only the specified tuple fields.
```java
new CassandraWriterBolt(
@@ -48,7 +48,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
);
```
-##### Insert query including all tuple fields.
+#### Insert query including all tuple fields.
```java
new CassandraWriterBolt(
@@ -59,7 +59,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
);
```
-##### Insert multiple queries from one input tuple.
+#### Insert multiple queries from one input tuple.
```java
new CassandraWriterBolt(
@@ -70,7 +70,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
);
```
-##### Insert query using QueryBuilder
+#### Insert query using QueryBuilder
```java
new CassandraWriterBolt(
@@ -81,7 +81,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
)
```
-##### Insert query with static bound query
+#### Insert query with static bound query
```java
new CassandraWriterBolt(
@@ -92,7 +92,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
);
```
-##### Insert query with static bound query using named setters and aliases
+#### Insert query with static bound query using named setters and aliases
```java
new CassandraWriterBolt(
@@ -109,7 +109,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
);
```
-##### Insert query with bound statement load from storm configuration
+#### Insert query with bound statement load from storm configuration
```java
new CassandraWriterBolt(
@@ -117,7 +117,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
.bind(all());
```
-##### Insert query with bound statement load from tuple field
+#### Insert query with bound statement load from tuple field
```java
new CassandraWriterBolt(
@@ -125,7 +125,7 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
.bind(all());
```
-##### Insert query with batch statement
+#### Insert query with batch statement
```java
// Logged
@@ -202,34 +202,127 @@ builder.setBolt("BOLT_WRITER", bolt, 4)
.customGrouping("spout", new Murmur3StreamGrouping("title"))
```
-### Trident API support
-storm-cassandra support Trident `state` API for `inserting` data into Cassandra.
+## Trident State Support
+
+For a state factory which writes output to Cassandra, use ```CassandraStateFactory``` with an ```INSERT INTO``` statement:
+
```java
- CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+
+ // Build state
CQLStatementTupleMapper insertTemperatureValues = boundQuery(
"INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
- .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
- options.withCQLStatementTupleMapper(insertTemperatureValues);
+ .bind(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))
+ .build();
+
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext())
+ .withCQLStatementTupleMapper(insertTemperatureValues);
+
CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options);
- TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
- stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
- stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
- stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+
+ // Use state in existing stream
+ stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater());
+
+```
+
+For a state factory which can query Cassandra, use ```CassandraStateFactory``` with a ```SELECT``` statment:
+
+```java
+
+ // Build state
+ CQLStatementTupleMapper selectStationName = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+ .bind(field("weather_station_id").as("id"))
+ .build();
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext())
+ .withCQLStatementTupleMapper(selectStationName)
+ .withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+ CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options);
+
+ // Append query to existing stream
+ stream.stateQuery(selectWeatherStationStateFactory, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+
+```
+
+## Trident MapState Support
+
+For a MapState with Cassandra IBackingMap, the simplest option is to use a ```MapStateBuilder``` which generates CQL statements automatically.
+The builder supports opaque, transactional and non-transactional map states.
+
+To store values in Cassandra you need to provide a ```StateMapper``` that maps the value to fields.
+
+For simple values, the ```SimpleStateMapper``` can be used:
+
+```java
+ StateFactory mapState = MapStateFactoryBuilder.opaque()
+ .withTable("mykeyspace", "year_month_state")
+ .withKeys("year", "month")
+ .withStateMapper(SimpleStateMapper.opqaue("txid", "sum", "prevSum"))
+ .build();
```
-Below `state` API for `querying` data from Cassandra.
+For complex values you can either custom build a state mapper, or use binary serialization:
+
+```java
+ StateFactory mapState = MapStateFactoryBuilder.opaque()
+ .withTable("mykeyspace", "year_month_state")
+ .withKeys("year", "month")
+ .withJSONBinaryState("state")
+ .build();
+```
+
+The JSONBinary methods use the storm JSON serializers, but you can also provide custom serializers if you want.
+
+For instance, the ```NonTransactionalTupleStateMapper```, ```TransactionalTupleStateMapper``` or ```OpaqueTupleStateMapper```
+classes can be used if the map state uses tuples as values.
+
```java
- CassandraState.Options options = new CassandraState.Options(new CassandraContext());
- CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
- .bind(with(field("weather_station_id").as("id")));
- options.withCQLStatementTupleMapper(insertTemperatureValues);
- options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
- CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options);
- CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
- TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
- stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+ StateFactory mapState = MapStateFactoryBuilder.<ITuple>nontransactional()
+ .withTable("mykeyspace", "year_month_state")
+ .withKeys("year", "month")
+ .withStateMapper(new NonTransactionalTupleStateMapper("latest_value"))
+ .build();
```
+Alternatively, you can construct a ```CassandraMapStateFactory``` yourself:
+
+```java
+
+ CQLStatementTupleMapper get = simpleQuery("SELECT state FROM words_ks.words_table WHERE word = ?")
+ .with(fields("word"))
+ .build();
+
+ CQLStatementTupleMapper put = simpleQuery("INSERT INTO words_ks.words_table (word, state) VALUES (?, ?)")
+ .with(fields("word", "state"))
+ .build();
+
+ CassandraBackingMap.Options<Integer> mapStateOptions = new CassandraBackingMap.Options<Integer>(new CassandraContext())
+ .withBatching(BatchStatement.Type.UNLOGGED)
+ .withKeys(new Fields("word"))
+ .withNonTransactionalJSONBinaryState("state")
+ .withMultiGetCQLStatementMapper(get)
+ .withMultiPutCQLStatementMapper(put);
+
+ CassandraMapStateFactory factory = CassandraMapStateFactory.nonTransactional(mapStateOptions)
+ .withCache(0);
+
+```
+
+### MapState Parallelism
+
+The backing map implementation submits queries (gets and puts) in parallel to the Cassandra cluster.
+The default number of parallel requests based on the driver configuration, which ends up being 128 with
+default driver configuration. The maximum parallelism applies to the cluster as a whole, and to each
+state instance (per worker, not executor).
+
+The default calculation is:
+ default = min(max local, max remote) / 2
+
+which normally means:
+ min(1024, 256) / 2 = 128
+
+This is deliberately conservative to avoid issues in most setups. If this does not provide sufficient
+throughput you can either explicitly override the max parallelism on the state builder/factory/backingmap,
+or you can update the driver configuration.
+
## License
Licensed to the Apache Software Foundation (ASF) under one
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 0eff9f6..ed7ccf4 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -38,6 +38,7 @@
<guava.version>16.0.1</guava.version>
<commons-lang3.version>3.3</commons-lang3.version>
<cassandra.driver.core.version>3.1.2</cassandra.driver.core.version>
+ <cassandra.version>2.1.7</cassandra.version>
</properties>
<developers>
@@ -70,6 +71,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-all</artifactId>
+ <version>${cassandra.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
@@ -100,5 +108,6 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
index 5366c81..63b81fe 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
@@ -22,17 +22,22 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
-import com.google.common.util.concurrent.*;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.storm.topology.FailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -106,6 +111,7 @@ public class AsyncExecutor<T> implements Serializable {
public SettableFuture<T> execAsync(final Statement statement, final T inputs) {
return execAsync(statement, inputs, handler);
}
+
/**
* Asynchronously executes the specified batch statement. Inputs will be passed to
* the {@link #handler} once query succeed or failed.
@@ -138,6 +144,117 @@ public class AsyncExecutor<T> implements Serializable {
}
/**
+ * Asynchronously executes the specified select statements. Results will be passed to the {@link AsyncResultSetHandler}
+ * once each query has succeed or failed.
+ */
+ public SettableFuture<List<T>> execAsync(final List<Statement> statements, final List<T> inputs, Semaphore throttle, final AsyncResultSetHandler<T> handler) {
+
+ final SettableFuture<List<T>> settableFuture = SettableFuture.create();
+ if (inputs.size() == 0) {
+ settableFuture.set(new ArrayList<T>());
+ return settableFuture;
+ }
+
+ final AsyncContext<T> asyncContext = new AsyncContext<>(inputs, throttle, settableFuture);
+ for (int i = 0; i < statements.size(); i++) {
+
+ // Acquire a slot
+ if (asyncContext.acquire()) {
+ try {
+ pending.incrementAndGet();
+ final T input = inputs.get(i);
+ final Statement statement = statements.get(i);
+ ResultSetFuture future = session.executeAsync(statement);
+ Futures.addCallback(future, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ try {
+ handler.success(input, result);
+ } catch (Throwable throwable) {
+ asyncContext.exception(throwable);
+ } finally {
+ pending.decrementAndGet();
+ asyncContext.release();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ try {
+ handler.failure(throwable, input);
+ } catch (Throwable throwable2) {
+ asyncContext.exception(throwable2);
+ }
+ finally {
+ asyncContext
+ .exception(throwable)
+ .release();
+ pending.decrementAndGet();
+ LOG.error(String.format("Failed to execute statement '%s' ", statement), throwable);
+ }
+ }
+ }, executorService);
+ } catch (Throwable throwable) {
+ asyncContext.exception(throwable)
+ .release();
+ pending.decrementAndGet();
+ break;
+ }
+ }
+
+ }
+
+ return settableFuture;
+ }
+
+ private static class AsyncContext<T> {
+ private final List<T> inputs;
+ private final SettableFuture<List<T>> future;
+ private final AtomicInteger latch;
+ private final List<Throwable> exceptions;
+ private final Semaphore throttle;
+
+ public AsyncContext(List<T> inputs, Semaphore throttle, SettableFuture<List<T>> settableFuture) {
+ this.inputs = inputs;
+ this.latch = new AtomicInteger(inputs.size());
+ this.throttle = throttle;
+ this.exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
+ this.future = settableFuture;
+ }
+
+ public boolean acquire() {
+ throttle.acquireUninterruptibly();
+ // Don't start new requests if there is an exception
+ if (exceptions.size() > 0) {
+ latch.decrementAndGet();
+ throttle.release();
+ return false;
+ }
+ return true;
+ }
+
+ public AsyncContext release() {
+ int remaining = latch.decrementAndGet();
+ if (remaining == 0) {
+ if (exceptions.size() == 0) {
+ future.set(inputs);
+ }
+ else {
+ future.setException(new MultiFailedException(exceptions));
+ }
+
+ }
+ throttle.release();
+ return this;
+ }
+
+ public AsyncContext exception(Throwable throwable) {
+ this.exceptions.add(throwable);
+ return this;
+ }
+ }
+
+ /**
* Returns the number of currently executed tasks which are not yet completed.
*/
public int getPendingTasksSize() {
@@ -150,4 +267,48 @@ public class AsyncExecutor<T> implements Serializable {
this.executorService.shutdownNow();
}
}
+
+ public static class MultiFailedException extends FailedException {
+ private final List<Throwable> exceptions;
+
+ public MultiFailedException(List<Throwable> exceptions) {
+ super(getMessage(exceptions), exceptions.get(0));
+ this.exceptions = exceptions;
+ }
+
+ private static String getMessage(List<Throwable> exceptions) {
+ int top5 = Math.min(exceptions.size(), 5);
+ StringBuilder sb = new StringBuilder();
+ sb.append("First ")
+ .append(top5)
+ .append(" exceptions: ")
+ .append(System.lineSeparator());
+ for (int i = 0; i < top5; i++) {
+ sb.append(exceptions.get(i).getMessage())
+ .append(System.lineSeparator());
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(getMessage())
+ .append(System.lineSeparator())
+ .append("Multiple exceptions encountered: ")
+ .append(System.lineSeparator());
+
+ for (Throwable exception : exceptions) {
+ sb.append(exception.toString())
+ .append(System.lineSeparator());
+ }
+
+ return super.toString();
+ }
+
+ public List<Throwable> getExceptions() {
+ return exceptions;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
index 0c684c0..f4b7277 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java
@@ -31,7 +31,7 @@ public class AsyncExecutorProvider {
* Returns a new {@link AsyncExecutor} per storm executor.
*/
public static <T> AsyncExecutor getLocal(Session session, AsyncResultHandler<T> handler) {
- AsyncExecutor<T> executor = localAsyncExecutor.get();
+ AsyncExecutor<T> executor = localAsyncExecutor.<T>get();
if( executor == null ) {
localAsyncExecutor.set(executor = new AsyncExecutor<>(session, handler));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
new file mode 100644
index 0000000..8ccb400
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+
+import java.io.Serializable;
+
+/**
+ * Default handler for batch asynchronous execution.
+ */
+public interface AsyncResultSetHandler<T> extends Serializable {
+
+ public static final AsyncResultSetHandler NO_OP_HANDLER = new AsyncResultSetHandler() {
+ @Override
+ public void failure(Throwable t, Object inputs) {
+ /** no-operation **/
+ }
+
+ @Override
+ public void success(Object inputs, ResultSet resultSet) {
+ /** no-operation **/
+ }
+
+ };
+
+ /**
+ * This method is responsible for failing specified inputs.
+ *
+ * @param t The cause the failure.
+ * @param inputs The input tuple proceed.
+ */
+ void failure(Throwable t, T inputs);
+
+ /**
+ * This method is responsible for acknowledging specified inputs.
+ *
+ * @param inputs The input tuple proceed.
+ */
+ void success(T inputs, ResultSet resultSet) ;
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
new file mode 100644
index 0000000..9b92b99
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A resultset mapper that
+ */
+public interface AyncCQLResultSetValuesMapper extends Serializable {
+
+ List<List<Values>> map(Session session, List<Statement> statements, List<ITuple> tuples);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
new file mode 100644
index 0000000..82f3b9c
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
+import org.apache.storm.trident.state.JSONOpaqueSerializer;
+import org.apache.storm.trident.state.JSONTransactionalSerializer;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.IBackingMap;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+/**
+ * An IBackingState implementation for Cassandra.
+ *
+ * The implementation stores state as a binary blob in cassandra using a {@link Serializer}.
+ * It supports Opaque, Transactional and NonTransactional states, given a matching serializer.
+ *
+ * Configuration is done with three separate constructs:
+ * - One tuple mapper for multiGet, which should map keys to a select statement and return {@link Values}.
+ * - One state mapper, which maps the state to/from a {@link Values} representation, which is used for binding.
+ * - One tuple mapper for multiPut, which should map {@link Values} to an INSERT or UPDATE statement.
+ *
+ * {@link #multiPut(List, List)} updates Cassandra with parallel statements.
+ * {@link #multiGet(List)} queries Cassandra with parallel statements.
+ *
+ * Parallelism defaults to half the maximum requests per host, either local or remote whichever is
+ * lower. The driver defaults to 256 for remote hosts and 1024 for local hosts, so the default value is 128
+ * unless the driver is configured otherwise.
+ *
+ * @param <T>
+ */
+public class CassandraBackingMap<T> implements IBackingMap<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraBackingMap.class);
+
+ private final Map conf;
+ private final Options<T> options;
+ private final Fields allFields;
+
+ private SimpleClient client;
+ private Session session;
+ private AyncCQLResultSetValuesMapper getResultMapper;
+ private AyncCQLResultSetValuesMapper putResultMapper;
+ private Semaphore throttle;
+
+
+ protected CassandraBackingMap(Map conf, Options<T> options) {
+ this.conf = conf;
+ this.options = options;
+ List<String> allFields = options.keyFields.toList();
+ allFields.addAll(options.stateMapper.getStateFields().toList());
+ this.allFields = new Fields(allFields);
+ }
+
+ public void prepare() {
+ LOG.info("Preparing state for {}", options.toString());
+ Preconditions.checkNotNull(options.getMapper, "CassandraBackingMap.Options should have getMapper");
+ Preconditions.checkNotNull(options.putMapper, "CassandraBackingMap.Options should have putMapper");
+ client = options.clientProvider.getClient(conf);
+ session = client.connect();
+ if (options.maxParallelism == null || options.maxParallelism <= 0) {
+ PoolingOptions po = session.getCluster().getConfiguration().getPoolingOptions();
+ Integer maxRequestsPerHost = Math.min(
+ po.getMaxConnectionsPerHost(HostDistance.LOCAL) * po.getMaxRequestsPerConnection(HostDistance.LOCAL),
+ po.getMaxConnectionsPerHost(HostDistance.REMOTE) * po.getMaxRequestsPerConnection(HostDistance.REMOTE)
+ );
+ options.maxParallelism = maxRequestsPerHost / 2;
+ LOG.info("Parallelism default set to {}", options.maxParallelism);
+ }
+ throttle = new Semaphore(options.maxParallelism, false);
+ this.getResultMapper = new TridentAyncCQLResultSetValuesMapper(options.stateMapper.getStateFields(), throttle);
+ this.putResultMapper = new TridentAyncCQLResultSetValuesMapper(null, throttle);
+ }
+
+ @Override
+ public List<T> multiGet(List<List<Object>> keys) {
+ LOG.debug("multiGet fetching {} values.", keys.size());
+ List<Statement> selects = new ArrayList<>();
+ List<ITuple> keyTuples = new ArrayList<>();
+
+ for (int i = 0; i < keys.size(); i++) {
+ SimpleTuple keyTuple = new SimpleTuple(options.keyFields, keys.get(i));
+ List<Statement> mappedStatements = options.getMapper.map(conf, session, keyTuple);
+ if (mappedStatements.size() > 1) {
+ throw new IllegalArgumentException("Only one statement per map state item is supported.");
+ }
+ selects.add(mappedStatements.size() == 1 ? mappedStatements.get(0) : null);
+ keyTuples.add(keyTuple);
+ }
+
+ List<List<Values>> results = getResultMapper
+ .map(session, selects, keyTuples);
+
+ List<T> states = new ArrayList<>();
+ for (List<Values> values : results) {
+ T state = (T) options.stateMapper.fromValues(values);
+ states.add(state);
+ }
+
+ return states;
+
+ }
+
+ @Override
+ public void multiPut(List<List<Object>> keys, List<T> values) {
+ LOG.debug("multiPut writing {} values.", keys.size());
+
+ List<Statement> statements = new ArrayList<>();
+ for (int i = 0; i < keys.size(); i++) {
+ Values stateValues = options.stateMapper.toValues(values.get(i));
+ SimpleTuple tuple = new SimpleTuple(allFields, keys.get(i), stateValues);
+ statements.addAll(options.putMapper.map(conf, session, tuple));
+ }
+
+ try {
+ putResultMapper.map(session, statements, null);
+ } catch (Exception e) {
+ LOG.warn("Write operation failed: {}", e.getMessage());
+ throw new FailedException(e);
+ }
+ }
+
+ public static final class Options<T> implements Serializable {
+ private final SimpleClientProvider clientProvider;
+ private Fields keyFields;
+ private StateMapper stateMapper;
+ private CQLStatementTupleMapper getMapper;
+ private CQLStatementTupleMapper putMapper;
+ private Integer maxParallelism = 128;
+
+ public Options(SimpleClientProvider clientProvider) {
+ this.clientProvider = clientProvider;
+ }
+
+ public Options<T> withKeys(Fields keyFields) {
+ this.keyFields = keyFields;
+ return this;
+ }
+
+ public Options<T> withStateMapper(StateMapper<T> stateMapper) {
+ this.stateMapper = stateMapper;
+ return this;
+ }
+
+ public Options<T> withNonTransactionalJSONBinaryState(String fieldName) {
+ this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONNonTransactionalSerializer());
+ return this;
+ }
+
+ public Options<T> withNonTransactionalBinaryState(String fieldName, Serializer<T> serializer) {
+ this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+ return this;
+ }
+
+ public Options<T> withTransactionalJSONBinaryState(String fieldName) {
+ this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONTransactionalSerializer());
+ return this;
+ }
+
+ public Options<T> withTransactionalBinaryState(String fieldName, Serializer<TransactionalValue<T>> serializer) {
+ this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+ return this;
+ }
+
+ public Options<T> withOpaqueJSONBinaryState(String fieldName) {
+ this.stateMapper = new SerializedStateMapper<>(fieldName, new JSONOpaqueSerializer());
+ return this;
+ }
+
+ public Options<T> withOpaqueBinaryState(String fieldName, Serializer<OpaqueValue<T>> serializer) {
+ this.stateMapper = new SerializedStateMapper<>(fieldName, serializer);
+ return this;
+ }
+
+ public Options<T> withGetMapper(CQLStatementTupleMapper getMapper) {
+ this.getMapper = getMapper;
+ return this;
+ }
+
+ public Options<T> withPutMapper(CQLStatementTupleMapper putMapper) {
+ this.putMapper = putMapper;
+ return this;
+ }
+
+ public Options<T> withMaxParallelism(Integer maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s: [keys: %s, StateMapper: %s, getMapper: %s, putMapper: %s, maxParallelism: %d",
+ this.getClass().getSimpleName(),
+ keyFields,
+ stateMapper,
+ getMapper,
+ putMapper,
+ maxParallelism
+ );
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
new file mode 100644
index 0000000..abd9477
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraMapStateFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.CachedMap;
+import org.apache.storm.trident.state.map.IBackingMap;
+import org.apache.storm.trident.state.map.MapState;
+import org.apache.storm.trident.state.map.NonTransactionalMap;
+import org.apache.storm.trident.state.map.OpaqueMap;
+import org.apache.storm.trident.state.map.TransactionalMap;
+
+import java.util.Map;
+
+/**
+ * A StateFactory implementation that creates a MapState backed by CassandraBackingMap.
+ *
+ * The statefactory supports opaque, transactional and non-transactional configurations.
+ * Optionally, the backing map can be wrapped in a {@link CachedMap} by specifying {@link #withCache} (off by default).
+ *
+ */
+public class CassandraMapStateFactory implements StateFactory {
+
+ private final StateType stateType;
+ private final CassandraBackingMap.Options options;
+ private int cacheSize;
+ private Map cassandraConfig;
+
+ private CassandraMapStateFactory(StateType stateType, CassandraBackingMap.Options options, Map cassandraConfig) {
+ this.stateType = stateType;
+ this.options = options;
+ this.cassandraConfig = cassandraConfig;
+ }
+
+ public static CassandraMapStateFactory opaque(CassandraBackingMap.Options options, Map cassandraConfig) {
+ return new CassandraMapStateFactory(StateType.OPAQUE, options, cassandraConfig);
+ }
+
+ public static CassandraMapStateFactory transactional(CassandraBackingMap.Options options, Map cassandraConfig) {
+ return new CassandraMapStateFactory(StateType.TRANSACTIONAL, options, cassandraConfig);
+ }
+
+ public static CassandraMapStateFactory nonTransactional(CassandraBackingMap.Options options, Map cassandraConfig) {
+ return new CassandraMapStateFactory(StateType.NON_TRANSACTIONAL, options, cassandraConfig);
+ }
+
+ public CassandraMapStateFactory withCache(int cacheSize) {
+ this.cacheSize = cacheSize;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+
+ CassandraBackingMap cassandraBackingMap = new CassandraBackingMap(cassandraConfig, options);
+ cassandraBackingMap.prepare();
+
+ IBackingMap backingMap = cacheSize > 0
+ ? new CachedMap<>(cassandraBackingMap, cacheSize)
+ : cassandraBackingMap;
+
+ MapState<?> mapState;
+
+ switch (stateType) {
+ case OPAQUE:
+ mapState = OpaqueMap.build((IBackingMap<OpaqueValue>) backingMap);
+ break;
+
+ case TRANSACTIONAL:
+ mapState = TransactionalMap.build((IBackingMap<TransactionalValue>)backingMap);
+ break;
+
+ case NON_TRANSACTIONAL:
+ mapState = NonTransactionalMap.build(backingMap);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Invalid state provided " + stateType);
+ }
+
+ return mapState;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
new file mode 100644
index 0000000..c371fdb
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/MapStateFactoryBuilder.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.Select;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
+import org.apache.storm.trident.state.JSONOpaqueSerializer;
+import org.apache.storm.trident.state.JSONTransactionalSerializer;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+
+/**
+ * A helper for building a MapState backed by Cassandra. It internalizes some common
+ * implementation choices to simplify usage.
+ *
+ * In the simplest use case, a map state can be constructed with:
+ *
+ * StateFactory mapState = MapStateFactoryBuilder.opaque()
+ * .withTable("mykeyspace", "year_month_state")
+ * .withKeys("year", "month")
+ * .withJSONBinaryState("state")
+ * .build();
+ *
+ * for a cassandra table with:
+ * mykeyspace.year_month_state {
+ * year: int,
+ * month: int,
+ * state: blob
+ * }
+ *
+ * This will use the storm JSON serializers to convert the state to and from binary format.
+ * Other binary serializers can be used with the {@link #withBinaryState(String, Serializer)} method.
+ *
+ * Storing state in explicit fields (e.g. in a field "sum" of type int) is possible by instead calling
+ * {@link #withStateMapper(StateMapper)}. For instance, you can use {@link NonTransactionalTupleStateMapper},
+ * {@link TransactionalTupleStateMapper} or {@link OpaqueTupleStateMapper} if your state values are tuples.
+ *
+ */
+public class MapStateFactoryBuilder<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(MapStateFactoryBuilder.class);
+
+ private String keyspace;
+ private String table;
+ private String[] keys;
+ private Integer maxParallelism;
+ private StateType stateType;
+ private StateMapper<T> stateMapper;
+ private Map cassandraConfig;
+ private int cacheSize;
+
+ public static <U> MapStateFactoryBuilder<OpaqueValue<U>> opaque(Map cassandraConf) {
+ return new MapStateFactoryBuilder<OpaqueValue<U>>()
+ .withStateType(StateType.OPAQUE)
+ .withCassandraConfig(cassandraConf);
+ }
+
+ public static <U> MapStateFactoryBuilder<TransactionalValue<U>> transactional(Map cassandraConf) {
+ return new MapStateFactoryBuilder<TransactionalValue<U>>()
+ .withStateType(StateType.TRANSACTIONAL)
+ .withCassandraConfig(cassandraConf);
+ }
+
+ public static <U> MapStateFactoryBuilder<U> nontransactional(Map cassandraConf) {
+ return new MapStateFactoryBuilder<U>()
+ .withStateType(StateType.NON_TRANSACTIONAL)
+ .withCassandraConfig(cassandraConf);
+ }
+
+ public MapStateFactoryBuilder<T> withTable(String keyspace, String table) {
+ this.keyspace = keyspace;
+ this.table = table;
+ return this;
+ }
+
+ public MapStateFactoryBuilder<T> withKeys(String... keys) {
+ this.keys = keys;
+ return this;
+ }
+
+ public MapStateFactoryBuilder<T> withMaxParallelism(Integer maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public MapStateFactoryBuilder<T> withJSONBinaryState(String stateField) {
+ switch (stateType) {
+ case OPAQUE:
+ return withBinaryState(stateField, (Serializer) new JSONOpaqueSerializer());
+ case TRANSACTIONAL:
+ return withBinaryState(stateField, (Serializer) new JSONTransactionalSerializer());
+ case NON_TRANSACTIONAL:
+ return withBinaryState(stateField, new JSONNonTransactionalSerializer());
+ default:
+ throw new IllegalArgumentException("State type " + stateType + " is unknown.");
+ }
+ }
+
+ public MapStateFactoryBuilder<T> withStateMapper(StateMapper<T> stateMapper) {
+ this.stateMapper = stateMapper;
+ return this;
+ }
+
+ public MapStateFactoryBuilder<T> withBinaryState(String stateField, Serializer<T> serializer) {
+ return withStateMapper(new SerializedStateMapper<>(stateField, serializer));
+ }
+
+ protected MapStateFactoryBuilder<T> withStateType(StateType stateType) {
+ this.stateType = stateType;
+ return this;
+ }
+
+ protected MapStateFactoryBuilder<T> withCassandraConfig(Map cassandraConf) {
+ this.cassandraConfig = cassandraConf;
+ return this;
+ }
+
+ public MapStateFactoryBuilder<T> withCache(int cacheSize) {
+ this.cacheSize = cacheSize;
+ return this;
+ }
+
+ public StateFactory build() {
+
+ Objects.requireNonNull(keyspace, "A keyspace is required.");
+ Objects.requireNonNull(table, "A table name is required.");
+ Objects.requireNonNull(keys, "At least one key must be specified.");
+ if (keys.length == 0) {
+ throw new IllegalArgumentException("At least one key must be specified.");
+ }
+ Objects.requireNonNull(stateMapper, "A state mapper must be specified.");
+ Objects.requireNonNull(stateType, "A state type must be specified.");
+
+ List<String> stateFields = stateMapper.getStateFields()
+ .toList();
+
+ String[] stateFieldsArray = stateFields.toArray(new String[stateFields.size()]);
+
+ List<String> allFields = new ArrayList<>();
+ Collections.addAll(allFields, keys);
+ allFields.addAll(stateFields);
+
+ // Build get query
+ Select.Where getQuery = select(stateFieldsArray)
+ .from(keyspace, table)
+ .where();
+
+ for (String key : keys) {
+ getQuery.and(eq(key, bindMarker()));
+ }
+
+ CQLStatementTupleMapper get = boundQuery(getQuery.toString())
+ .bind(all())
+ .build();
+
+ // Build put query
+ Insert putStatement = insertInto(keyspace, table)
+ .values(allFields, Collections.<Object>nCopies(allFields.size(), bindMarker()));
+
+ CQLStatementTupleMapper put = boundQuery(putStatement.toString())
+ .bind(all())
+ .build();
+
+ CassandraBackingMap.Options options = new CassandraBackingMap.Options<T>(new CassandraContext())
+ .withGetMapper(get)
+ .withPutMapper(put)
+ .withStateMapper(stateMapper)
+ .withKeys(new Fields(keys))
+ .withMaxParallelism(maxParallelism);
+
+ logger.debug("Building factory with: \n get: {}\n put: {}\n mapper: {}",
+ getQuery.toString(),
+ putStatement.toString(),
+ stateMapper.toString());
+
+ switch (stateType) {
+ case NON_TRANSACTIONAL:
+ return CassandraMapStateFactory.nonTransactional(options, cassandraConfig)
+ .withCache(cacheSize);
+ case TRANSACTIONAL:
+ return CassandraMapStateFactory.transactional(options, cassandraConfig)
+ .withCache(cacheSize);
+ case OPAQUE:
+ return CassandraMapStateFactory.opaque(options, cassandraConfig)
+ .withCache(cacheSize);
+ }
+
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
new file mode 100644
index 0000000..3a36b07
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/NonTransactionalTupleStateMapper.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+/**
+ * State mapper that maps a tuple to separate state fields.
+ */
+public class NonTransactionalTupleStateMapper implements StateMapper<ITuple> {
+
+ private final Fields fields;
+
+ public NonTransactionalTupleStateMapper(String... fields) {
+ this.fields = new Fields(fields);
+ }
+
+ public NonTransactionalTupleStateMapper(Fields fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public Fields getStateFields() {
+ return fields;
+ }
+
+ @Override
+ public Values toValues(ITuple t) {
+ return new Values(t.getValues());
+ }
+
+ @Override
+ public ITuple fromValues(List<Values> values) {
+ if (values == null || values.size() == 0) {
+ return null;
+ }
+ return new SimpleTuple(fields, values.get(0));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), fields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
new file mode 100644
index 0000000..882c9b1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/OpaqueTupleStateMapper.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * State mapper that maps an opaque tuple to separate state fields.
+ */
+public class OpaqueTupleStateMapper implements StateMapper<OpaqueValue<ITuple>> {
+
+ private final Fields tupleFields;
+ private final Fields tableFields;
+
+ public OpaqueTupleStateMapper(String currTxIdField, String currPrefix, String prevPrefix, String... fields) {
+ this(currTxIdField, currPrefix, prevPrefix, new Fields(fields));
+ }
+
+ public OpaqueTupleStateMapper(String currTxIdField, String currPrefix, String prevPrefix, Fields fields) {
+ tupleFields = fields;
+ ArrayList<String> fieldList = new ArrayList<>();
+ fieldList.add(currTxIdField);
+ for (String field : fields) {
+ fieldList.add(currPrefix + field);
+ }
+ for (String field : fields) {
+ fieldList.add(prevPrefix + field);
+ }
+ tableFields = new Fields(fieldList);
+ }
+
+ @Override
+ public Fields getStateFields() {
+ return tableFields;
+ }
+
+ @Override
+ public Values toValues(OpaqueValue<ITuple> tuple) {
+ Values values = new Values();
+ values.add(tuple.getCurrTxid());
+
+ for (String valueField : tupleFields) {
+ if (tuple.getCurr() != null) {
+ values.add(tuple.getCurr().getValueByField(valueField));
+ }
+ else {
+ values.add(null);
+ }
+ }
+
+ for (String valueField : tupleFields) {
+ if (tuple.getPrev() != null) {
+ values.add(tuple.getPrev().getValueByField(valueField));
+ }
+ else {
+ values.add(null);
+ }
+ }
+
+ return values;
+ }
+
+ @Override
+ public OpaqueValue<ITuple> fromValues(List<Values> valuesList) {
+ if (valuesList == null || valuesList.size() == 0) {
+ return null;
+ }
+ Values values = valuesList.get(0);
+ int index = 0;
+ Long currTx = (Long) values.get(index++);
+
+ SimpleTuple curr = new SimpleTuple(tupleFields);
+ for (String valueField : tupleFields) {
+ curr.put(valueField, values.get(index++));
+ }
+
+ if (isAllNull(curr)) {
+ curr = null;
+ }
+
+ SimpleTuple prev = new SimpleTuple(tupleFields);
+ for (String valueField : tupleFields) {
+ prev.put(valueField, values.get(index++));
+ }
+ if (isAllNull(prev)) {
+ prev = null;
+ }
+
+ return new OpaqueValue<ITuple>(currTx, curr, prev);
+ }
+
+ private boolean isAllNull(SimpleTuple tuple) {
+ for (Object value : tuple.getValues()) {
+ if (value != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), tableFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
new file mode 100644
index 0000000..b4ec6c8
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SerializedStateMapper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SerializedStateMapper<T> implements StateMapper<T> {
+
+ private final Fields stateFields;
+ private final Serializer<T> serializer;
+
+ public SerializedStateMapper(String fieldName, Serializer<T> serializer) {
+ this.stateFields = new Fields(fieldName);
+ this.serializer = serializer;
+ }
+
+ @Override
+ public Fields getStateFields() {
+ return stateFields;
+ }
+
+ @Override
+ public Values toValues(T value) {
+ byte[] serialized = serializer.serialize(value);
+ return new Values(ByteBuffer.wrap(serialized));
+ }
+
+ @Override
+ public T fromValues(List<Values> values) {
+ if (values.size() == 0) {
+ return null;
+ }
+ else if (values.size() == 1) {
+ ByteBuffer bytes = (ByteBuffer) values.get(0).get(0);
+ return serializer.deserialize(bytes.array());
+ }
+ else {
+ throw new IllegalArgumentException("Can only convert single values, " + values.size() + " encountered");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{type: %s, fields: %s, serializer: %s}", this.getClass().getSimpleName(), stateFields, serializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
new file mode 100644
index 0000000..cc03a09
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleStateMapper.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class SimpleStateMapper<T> implements StateMapper<T> {
+
+ private final Fields fields;
+ private final StateType stateType;
+
+ public SimpleStateMapper(Fields fields, StateType stateType) {
+ this.fields = fields;
+ this.stateType = stateType;
+ }
+
+ public static <U> StateMapper<OpaqueValue<U>> opaque(String txIdField, String previousField, String field) {
+ return new SimpleStateMapper<>(new Fields(txIdField, field, previousField), StateType.OPAQUE);
+ }
+
+ public static <U> StateMapper<TransactionalValue<U>> opaque(String txIdField, String field) {
+ return new SimpleStateMapper<>(new Fields(txIdField, field), StateType.TRANSACTIONAL);
+ }
+
+ public static <U> StateMapper<U> nontransactional(String field) {
+ return new SimpleStateMapper<>(new Fields(field), StateType.NON_TRANSACTIONAL);
+ }
+
+ @Override
+ public Fields getStateFields() {
+ return fields;
+ }
+
+ @Override
+ public Values toValues(T value) {
+ if (value == null) {
+ return null;
+ }
+ switch (stateType) {
+ case NON_TRANSACTIONAL:
+ return new Values(value);
+ case TRANSACTIONAL:
+ TransactionalValue transactional = (TransactionalValue) value;
+ return new Values(transactional.getTxid(), transactional.getVal());
+ case OPAQUE:
+ OpaqueValue opaque = (OpaqueValue) value;
+ return new Values(opaque.getCurrTxid(), opaque.getCurr(), opaque.getPrev());
+ default:
+ throw new IllegalStateException("Unknown state type " + stateType);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T fromValues(List<Values> valuesSet) {
+ if (valuesSet == null || valuesSet.size() == 0) {
+ return null;
+ }
+ else if (valuesSet.size() == 1) {
+ Values values = valuesSet.get(0);
+ if (values == null) {
+ return null;
+ }
+ switch (stateType) {
+ case NON_TRANSACTIONAL:
+ return (T) values.get(0);
+ case TRANSACTIONAL:
+ return (T) new TransactionalValue((Long) values.get(0), values.get(1));
+ case OPAQUE:
+ return (T) new OpaqueValue((Long) values.get(0), values.get(1), values.get(2));
+ default:
+ throw new IllegalStateException("Unknown state type " + stateType);
+ }
+ }
+ throw new IllegalStateException("State query returned multiple results.");
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{type: %s, fields: %s, stateType: %s}", this.getClass().getSimpleName(), fields, stateType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
new file mode 100644
index 0000000..d78f6d7
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/SimpleTuple.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class for passing around ordered key/value data with an immutable key set.
+ */
+public class SimpleTuple implements ITuple, Serializable {
+
+ private static final long serialVersionUID = -4656331293513898312L;
+
+ private final List<String> keys;
+ private List<Object> values;
+
+ public SimpleTuple(Fields keyFields, List<Object> values) {
+ this.keys = keyFields.toList();
+ this.values = new ArrayList<>();
+ this.values.addAll(values);
+ while (this.values.size() < keys.size()) {
+ this.values.add(null);
+ }
+ }
+
+ public SimpleTuple(Fields keyFields, List<Object>... values) {
+ this.keys = keyFields.toList();
+ this.values = new ArrayList<>();
+ for (List<Object> valueList : values) {
+ this.values.addAll(valueList);
+ }
+ while (this.values.size() < keys.size()) {
+ this.values.add(null);
+ }
+ }
+
+ public SimpleTuple put(String key, Object value) {
+ int index = keys.indexOf(key);
+ if (index >= 0) {
+ values.set(index, value);
+ }
+ else {
+ throw new IllegalArgumentException("Field " + key + " does not exist.");
+ }
+ return this;
+ }
+
+ public SimpleTuple setValues(List<Object> values) {
+ this.values = new ArrayList<>(values);
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return keys.size();
+ }
+
+ @Override
+ public boolean contains(String field) {
+ return keys.contains(field);
+ }
+
+ @Override
+ public Fields getFields() {
+ return new Fields(keys);
+ }
+
+ @Override
+ public int fieldIndex(String field) {
+ return keys.indexOf(field);
+ }
+
+ @Override
+ public List<Object> select(Fields selector) {
+ List<Object> values = new ArrayList<>();
+ for (String field : selector) {
+ values.add(getValueByField(field));
+ }
+ return values;
+ }
+
+ @Override
+ public Object getValue(int i) {
+ return values.get(i);
+ }
+
+ @Override
+ public String getString(int i) {
+ return (String) values.get(i);
+ }
+
+ @Override
+ public Integer getInteger(int i) {
+ return (Integer) values.get(i);
+ }
+
+ @Override
+ public Long getLong(int i) {
+ return (Long) values.get(i);
+ }
+
+ @Override
+ public Boolean getBoolean(int i) {
+ return (Boolean) values.get(i);
+ }
+
+ @Override
+ public Short getShort(int i) {
+ return (Short) values.get(i);
+ }
+
+ @Override
+ public Byte getByte(int i) {
+ return (Byte) values.get(i);
+ }
+
+ @Override
+ public Double getDouble(int i) {
+ return (Double) values.get(i);
+ }
+
+ @Override
+ public Float getFloat(int i) {
+ return (Float) values.get(i);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return (byte[]) values.get(i);
+ }
+
+ @Override
+ public Object getValueByField(String field) {
+ return values.get(keys.indexOf(field));
+ }
+
+ @Override
+ public String getStringByField(String field) {
+ return (String) getValueByField(field);
+ }
+
+ @Override
+ public Integer getIntegerByField(String field) {
+ return (Integer) getValueByField(field);
+ }
+
+ @Override
+ public Long getLongByField(String field) {
+ return (Long) getValueByField(field);
+ }
+
+ @Override
+ public Boolean getBooleanByField(String field) {
+ return (Boolean) getValueByField(field);
+ }
+
+ @Override
+ public Short getShortByField(String field) {
+ return (Short) getValueByField(field);
+ }
+
+ @Override
+ public Byte getByteByField(String field) {
+ return (Byte) getValueByField(field);
+ }
+
+ @Override
+ public Double getDoubleByField(String field) {
+ return (Double) getValueByField(field);
+ }
+
+ @Override
+ public Float getFloatByField(String field) {
+ return (Float) getValueByField(field);
+ }
+
+ @Override
+ public byte[] getBinaryByField(String field) {
+ return (byte[]) getValueByField(field);
+ }
+
+ @Override
+ public List<Object> getValues() {
+ return Collections.unmodifiableList(values);
+ }
+
+ public List<String> getKeys() {
+ return Collections.unmodifiableList(keys);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
new file mode 100644
index 0000000..ef0c783
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/StateMapper.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface StateMapper<T> extends Serializable {
+
+ Fields getStateFields();
+
+ Values toValues(T value);
+
+ T fromValues(List<Values> values);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
new file mode 100644
index 0000000..83332b9
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TransactionalTupleStateMapper.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * State mapper that maps a transactional tuple to separate state fields.
+ */
+public class TransactionalTupleStateMapper implements StateMapper<TransactionalValue<ITuple>> {
+
+ private final Fields tupleFields;
+ private final Fields tableFields;
+
+ public TransactionalTupleStateMapper(String txIdField, String... fields) {
+ this(txIdField, new Fields(fields));
+ }
+
+ public TransactionalTupleStateMapper(String txIdField, Fields fields) {
+ tupleFields = fields;
+ ArrayList<String> fieldList = new ArrayList<>();
+ fieldList.add(txIdField);
+ for (String field : fields) {
+ fieldList.add(field);
+ }
+ tableFields = new Fields(fieldList);
+ }
+
+ @Override
+ public Fields getStateFields() {
+ return tableFields;
+ }
+
+ @Override
+ public Values toValues(TransactionalValue<ITuple> tuple) {
+ Values values = new Values();
+ values.add(tuple.getTxid());
+
+ for (String valueField : tupleFields) {
+ if (tuple.getVal() != null) {
+ values.add(tuple.getVal().getValueByField(valueField));
+ }
+ else {
+ values.add(null);
+ }
+ }
+
+ return values;
+ }
+
+ @Override
+ public TransactionalValue<ITuple> fromValues(List<Values> valuesList) {
+ if (valuesList == null || valuesList.size() == 0) {
+ return null;
+ }
+ Values values = valuesList.get(0);
+ int index = 0;
+ Long txId = (Long) values.get(index++);
+
+ SimpleTuple curr = new SimpleTuple(tupleFields);
+ for (String valueField : tupleFields) {
+ curr.put(valueField, values.get(index++));
+ }
+
+ boolean isAllNull = true;
+ for (Object value : curr.getValues()) {
+ if (value != null) {
+ isAllNull = false;
+ break;
+ }
+ }
+ if (isAllNull) {
+ curr = null;
+ }
+
+ return new TransactionalValue<ITuple>(txId, curr);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{type: %s, fields: %s}", this.getClass().getSimpleName(), tableFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eb6c3089/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
new file mode 100644
index 0000000..0d03fed
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentAyncCQLResultSetValuesMapper.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.storm.cassandra.executor.AsyncExecutor;
+import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.AsyncResultSetHandler;
+import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A result set mapper implementation which runs requests in parallel and waits for them all to finish.
+ */
+public class TridentAyncCQLResultSetValuesMapper implements AyncCQLResultSetValuesMapper {
+ private final Fields outputDeclaredFields;
+ private final Semaphore throttle;
+
+ public TridentAyncCQLResultSetValuesMapper(Fields outputDeclaredFields, Semaphore throttle) {
+ this.outputDeclaredFields = outputDeclaredFields;
+ this.throttle = throttle;
+ }
+
+ @Override
+ public List<List<Values>> map(Session session, List<Statement> statements, final List<ITuple> tuples) {
+ AsyncExecutor<Integer> executor = AsyncExecutorProvider.getLocal(session, AsyncResultHandler.NO_OP_HANDLER);
+ final List<Integer> indexes = new ArrayList<>();
+ final List<List<Values>> results = new ArrayList<>();
+ for (int i = 0; i < statements.size(); i++) {
+ indexes.add(i);
+ results.add(null);
+ }
+ SettableFuture<List<Integer>> result = executor.execAsync(statements, indexes, throttle, new AsyncResultSetHandler<Integer>() {
+ @Override
+ public void success(Integer index, ResultSet resultSet) {
+ if (outputDeclaredFields != null) {
+ List<Values> thisResult = new ArrayList<>();
+ for (Row row : resultSet) {
+ final Values values = new Values();
+ for (String field : outputDeclaredFields) {
+ ITuple tuple = tuples.get(index);
+ if (tuple.contains(field)) {
+ values.add(tuple.getValueByField(field));
+ } else {
+ values.add(row.getObject(field));
+ }
+ }
+ thisResult.add(values);
+ }
+ results.set(index, thisResult);
+ }
+ }
+
+ @Override
+ public void failure(Throwable t, Integer index) {
+ // Exceptions are captured and thrown at the end of the batch by the executor
+ }
+
+ });
+
+ try {
+ // Await all results
+ result.get();
+ } catch (Exception e) {
+ throw new FailedException(e.getMessage(), e);
+ }
+
+ return results;
+ }
+
+ protected List<Values> handleResult(ResultSet resultSet, ITuple tuple) {
+ List<Values> list = new ArrayList<>();
+ for (Row row : resultSet) {
+ final Values values = new Values();
+ for (String field : outputDeclaredFields) {
+ if (tuple.contains(field)) {
+ values.add(tuple.getValueByField(field));
+ } else {
+ values.add(row.getObject(field));
+ }
+ }
+ list.add(values);
+ }
+ return list;
+ }
+
+
+
+}