You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:31:05 UTC
[1/6] storm git commit: STORM-1348 - refactor API to remove
Insert/Update builder in Cassandra connector
Repository: storm
Updated Branches:
refs/heads/master df32755b1 -> 989cae6ec
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
index 843bf6b..2593327 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -21,9 +21,15 @@ package org.apache.storm.cassandra;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.collect.Maps;
import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
import org.cassandraunit.CassandraCQLUnit;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.Assert;
@@ -31,20 +37,18 @@ import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
import static org.apache.storm.cassandra.DynamicStatementBuilder.async;
import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
import static org.apache.storm.cassandra.DynamicStatementBuilder.fields;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
import static org.apache.storm.cassandra.DynamicStatementBuilder.loggedBatch;
import static org.apache.storm.cassandra.DynamicStatementBuilder.unLoggedBatch;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
import static org.mockito.Mockito.when;
public class DynamicStatementBuilderTest {
@@ -64,78 +68,92 @@ public class DynamicStatementBuilderTest {
when(mockTuple.getFields()).thenReturn(new Fields("weather_station_id", "event_time", "temperature"));
}
+ public static final String QUERY_STRING = "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?,?,?);";
+
@Test
public void shouldBuildMultipleStaticInsertStatementGivenKeyspaceAndAllMapper() {
- executeStatementAndAssert(
- async(
- insertInto("weather", "temperature").values(all()),
- insertInto("weather", "temperature").values(all())
- ),
- "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
- "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
+ CQLStatementTupleMapper mapper = async(
+ simpleQuery(QUERY_STRING).with(all()),
+ simpleQuery(QUERY_STRING).with(all())
);
+ List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+ Assert.assertEquals(2, stmts.size());
+ makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
+ makeAssertSimpleStatement(QUERY_STRING, stmts.get(1));
}
@Test
public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
- executeStatementAndAssert(insertInto("weather", "temperature").values(all()).build(),
- "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
+ List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+ Assert.assertEquals(1, stmts.size());
+ makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
+ }
+
+ @Test
+ public void shouldBuildStaticInsertStatementUsingBuilderGivenKeyspaceAndAllMapper() {
+ Insert insert = QueryBuilder.insertInto("weather", "temperature")
+ .value("weather_station_id", "?")
+ .value("event_time", "?")
+ .value("temperature", "?");
+ SimpleCQLStatementMapper mapper = simpleQuery(insert).with(all()).build();
+ List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+ Assert.assertEquals(1, stmts.size());
+ makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
}
@Test
public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
- executeStatementAndAssert(insertInto("temperature").values(all()).build(),
- "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
+ List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+ Assert.assertEquals(1, stmts.size());
+ makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
}
@Test
public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
- executeStatementAndAssert(insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature"))).build(),
- "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")).build();
+ List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+ Assert.assertEquals(1, stmts.size());
+ makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
}
@Test
public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
- executeBatchStatementAndAssert(loggedBatch(
- insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
- ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+ List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+ Assert.assertEquals(1, stmts.size());
+ makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
}
@Test
public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
- executeBatchStatementAndAssert(unLoggedBatch(
- insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
- ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
- }
-
- private void executeBatchStatementAndAssert(CQLStatementTupleMapper mapper, String... results) {
- List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-
- BatchStatement statement = (BatchStatement) map.get(0);
- Collection<Statement> statements = statement.getStatements();
- Assert.assertEquals(results.length, statements.size());
-
- for (Statement s : statements)
- Assert.assertTrue(Arrays.asList(results).contains(s.toString()));
- }
-
-
- private void executeStatementAndAssert(CQLStatementTupleMapper mapper, String... expected) {
- List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-
- List<String> listExpected = Arrays.asList(expected);
- for (int i = 0; i < map.size(); i++) {
- Assert.assertEquals(listExpected.get(i), map.get(i).toString());
- }
+ BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+ List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
+ Assert.assertEquals(1, stmts.size());
+ makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
}
@Test
public void shouldBuildStaticBoundStatement() {
- CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weather_station_id, event_time, temperature) VALUES(?, ?, ?)")
- .bind(with(field("weather_station_id"), field("event_time").now(), field("temperature")));
+ CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field("weather_station_id"), field("event_time").now(), field("temperature")));
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Statement statement = map.get(0);
Assert.assertNotNull(statement);
}
+ public void makeAssertSimpleStatement(String expected, Statement actual) {
+ Assert.assertTrue(actual instanceof SimpleStatement);
+ Assert.assertEquals(expected, actual.toString());
+ Assert.assertEquals(3, ((SimpleStatement)actual).getValues(ProtocolVersion.V3).length);
+ }
+
+ public void makeAssertBatchStatement(String expected, Statement actual) {
+ Assert.assertTrue(actual instanceof BatchStatement);
+ Collection<Statement> statements = ((BatchStatement) actual).getStatements();
+ for(Statement s : statements) {
+ Assert.assertEquals(expected, s.toString());
+ Assert.assertEquals(3, ((SimpleStatement)s).getValues(ProtocolVersion.V3).length);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 80eb95a..9a61309 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -21,14 +21,13 @@ package org.apache.storm.cassandra.bolt;
import backtype.storm.topology.TopologyBuilder;
import com.datastax.driver.core.ResultSet;
import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
/**
@@ -45,8 +44,9 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
}
- private SimpleCQLStatementTupleMapper getInsertInto() {
- return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
+ private SimpleCQLStatementMapper getInsertInto() {
+ return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
+ .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index 2aef53c..e70b6b0 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -21,14 +21,13 @@ package org.apache.storm.cassandra.bolt;
import backtype.storm.topology.TopologyBuilder;
import com.datastax.driver.core.ResultSet;
import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
/**
*
@@ -44,8 +43,9 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
}
- private SimpleCQLStatementTupleMapper getInsertInto() {
- return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
+ private SimpleCQLStatementMapper getInsertInto() {
+ return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
+ .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
[6/6] storm git commit: Added STORM-1348 to Changelog
Posted by bo...@apache.org.
Added STORM-1348 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/989cae6e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/989cae6e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/989cae6e
Branch: refs/heads/master
Commit: 989cae6eccbedef127bf5db56961c040f3fee14d
Parents: fa1ec6f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 11 14:20:22 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 11 14:20:22 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/989cae6e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e544a95..4652993 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1348: refactor API to remove Insert/Update builder in Cassandra connector
* STORM-1206: Reduce logviewer memory usage through directory stream
* STORM-1219: Fix HDFS and Hive bolt flush/acking
* STORM-1150: Fix the authorization of Logviewer in method authorized-log-user?
[4/6] storm git commit: STORM-1348 - Fix tests added with STORM-1211
Posted by bo...@apache.org.
STORM-1348 - Fix tests added with STORM-1211
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0702f54d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0702f54d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0702f54d
Branch: refs/heads/master
Commit: 0702f54d72e1d18411fc24c9019e3e88b7d35e81
Parents: 054a7f3
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Mon Dec 14 11:43:19 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:22 2015 +0100
----------------------------------------------------------------------
.../apache/storm/cassandra/trident/TridentTopologyTest.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0702f54d/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
index c775919..bdacfe4 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
@@ -45,7 +45,6 @@ import java.util.Random;
import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
/**
*
@@ -109,7 +108,8 @@ public class TridentTopologyTest extends BaseTopologyTest {
CassandraState.Options options = new CassandraState.Options(new CassandraContext());
CQLStatementTupleMapper insertTemperatureValues = boundQuery(
"INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
- .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+ .bind(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))
+ .build();
options.withCQLStatementTupleMapper(insertTemperatureValues);
return new CassandraStateFactory(options);
}
@@ -117,7 +117,7 @@ public class TridentTopologyTest extends BaseTopologyTest {
public CassandraStateFactory getSelectWeatherStationStateFactory() {
CassandraState.Options options = new CassandraState.Options(new CassandraContext());
CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
- .bind(with(field("weather_station_id").as("id")));
+ .bind(field("weather_station_id").as("id")).build();
options.withCQLStatementTupleMapper(insertTemperatureValues);
options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
return new CassandraStateFactory(options);
[3/6] storm git commit: STORM-1348 - clean imports
Posted by bo...@apache.org.
STORM-1348 - clean imports
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/054a7f38
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/054a7f38
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/054a7f38
Branch: refs/heads/master
Commit: 054a7f38685d88f491666ae658591ab377179d6e
Parents: c2997cf
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Tue Dec 8 17:44:33 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:21 2015 +0100
----------------------------------------------------------------------
.../cassandra/DynamicStatementBuilder.java | 9 +++++--
.../storm/cassandra/Murmur3StreamGrouping.java | 2 --
.../query/CQLStatementTupleMapper.java | 6 -----
.../builder/BoundCQLStatementMapperBuilder.java | 7 ++++--
.../SimpleCQLStatementMapperBuilder.java | 3 ++-
.../query/impl/PreparedStatementBinder.java | 11 +++++++--
.../cassandra/query/selector/FieldSelector.java | 2 --
.../cassandra/DynamicStatementBuilderTest.java | 26 +++++++++++---------
.../bolt/BatchCassandraWriterBoltTest.java | 4 +--
.../cassandra/bolt/CassandraWriterBoltTest.java | 4 +--
10 files changed, 42 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
index ce53ea3..4a98632 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -20,14 +20,19 @@ package org.apache.storm.cassandra;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.querybuilder.BuiltStatement;
-import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.ContextQuery;
+import org.apache.storm.cassandra.query.CqlMapper;
import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder;
import org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder;
import org.apache.storm.cassandra.query.selector.FieldSelector;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
public class DynamicStatementBuilder implements Serializable {
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
index 966aacd..992bfd0 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -26,8 +26,6 @@ import backtype.storm.tuple.Fields;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
index b100d9a..fc960dd 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
@@ -21,20 +21,14 @@ package org.apache.storm.cassandra.query;
import backtype.storm.tuple.ITuple;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.google.common.base.Optional;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
-
/**
* Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
- *
*/
public interface CQLStatementTupleMapper extends Serializable {
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
index 52502b0..a144830 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
@@ -18,14 +18,17 @@
*/
package org.apache.storm.cassandra.query.builder;
-import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.ContextQuery;
+import org.apache.storm.cassandra.query.CqlMapper;
import org.apache.storm.cassandra.query.impl.BoundCQLStatementTupleMapper;
import org.apache.storm.cassandra.query.impl.PreparedStatementBinder;
import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
import org.apache.storm.cassandra.query.selector.FieldSelector;
import java.io.Serializable;
-import java.util.*;
+import java.util.Arrays;
+import java.util.List;
import static org.apache.storm.cassandra.query.ContextQuery.*;
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
index 401f1bf..29ad9c1 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
@@ -19,7 +19,8 @@
package org.apache.storm.cassandra.query.builder;
import com.datastax.driver.core.querybuilder.BuiltStatement;
-import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.CQLStatementBuilder;
+import org.apache.storm.cassandra.query.CqlMapper;
import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
import org.apache.storm.cassandra.query.selector.FieldSelector;
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
index 7a6c78d..4606b05 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
@@ -18,7 +18,10 @@
*/
package org.apache.storm.cassandra.query.impl;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.TupleValue;
+import com.datastax.driver.core.UDTValue;
import org.apache.storm.cassandra.query.Column;
import java.io.Serializable;
@@ -26,7 +29,11 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
/**
*
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
index 8ce651e..bba7fb5 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
@@ -23,8 +23,6 @@ import com.datastax.driver.core.utils.UUIDs;
import org.apache.storm.cassandra.query.Column;
import java.io.Serializable;
-import java.util.Map;
-
public class FieldSelector implements Serializable {
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
index 2593327..800cad7 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -57,15 +57,19 @@ public class DynamicStatementBuilderTest {
private static final Tuple mockTuple;
+ public static final String COL_WEATHER_STATION_ID = "weather_station_id";
+ public static final String COL_EVENT_TIME = "event_time";
+ public static final String COL_TEMPERATURE = "temperature";
+
@Rule
public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql", "weather"));
static {
mockTuple = Mockito.mock(Tuple.class);
- when(mockTuple.getValueByField("weather_station_id")).thenReturn("1");
- when(mockTuple.getValueByField("event_time")).thenReturn(NOW);
- when(mockTuple.getValueByField("temperature")).thenReturn("0°C");
- when(mockTuple.getFields()).thenReturn(new Fields("weather_station_id", "event_time", "temperature"));
+ when(mockTuple.getValueByField(COL_WEATHER_STATION_ID)).thenReturn("1");
+ when(mockTuple.getValueByField(COL_EVENT_TIME)).thenReturn(NOW);
+ when(mockTuple.getValueByField(COL_TEMPERATURE)).thenReturn("0°C");
+ when(mockTuple.getFields()).thenReturn(new Fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE));
}
public static final String QUERY_STRING = "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?,?,?);";
@@ -93,9 +97,9 @@ public class DynamicStatementBuilderTest {
@Test
public void shouldBuildStaticInsertStatementUsingBuilderGivenKeyspaceAndAllMapper() {
Insert insert = QueryBuilder.insertInto("weather", "temperature")
- .value("weather_station_id", "?")
- .value("event_time", "?")
- .value("temperature", "?");
+ .value(COL_WEATHER_STATION_ID, "?")
+ .value(COL_EVENT_TIME, "?")
+ .value(COL_TEMPERATURE, "?");
SimpleCQLStatementMapper mapper = simpleQuery(insert).with(all()).build();
List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Assert.assertEquals(1, stmts.size());
@@ -112,7 +116,7 @@ public class DynamicStatementBuilderTest {
@Test
public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
- SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")).build();
+ SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)).build();
List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Assert.assertEquals(1, stmts.size());
makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
@@ -120,7 +124,7 @@ public class DynamicStatementBuilderTest {
@Test
public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
- BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+ BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Assert.assertEquals(1, stmts.size());
makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
@@ -128,7 +132,7 @@ public class DynamicStatementBuilderTest {
@Test
public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
- BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields("weather_station_id", "event_time", "temperature")));
+ BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Assert.assertEquals(1, stmts.size());
makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
@@ -136,7 +140,7 @@ public class DynamicStatementBuilderTest {
@Test
public void shouldBuildStaticBoundStatement() {
- CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field("weather_station_id"), field("event_time").now(), field("temperature")));
+ CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field(COL_WEATHER_STATION_ID), field(COL_EVENT_TIME).now(), field(COL_TEMPERATURE)));
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Statement statement = map.get(0);
Assert.assertNotNull(statement);
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 9a61309..c5116e2 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -45,8 +45,8 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
}
private SimpleCQLStatementMapper getInsertInto() {
- return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
- .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
+ return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
+ .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
http://git-wip-us.apache.org/repos/asf/storm/blob/054a7f38/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index e70b6b0..3d1b623 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -44,8 +44,8 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
}
private SimpleCQLStatementMapper getInsertInto() {
- return simpleQuery("INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES (?, ?, ?);")
- .with(field("weatherstation_id"), field("event_time").now(), field("temperature")).build();
+ return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
+ .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
[5/6] storm git commit: Merge branch 'master' of
https://github.com/fhussonnois/storm into STORM-1348
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/fhussonnois/storm into STORM-1348
STORM-1348: refactor API to remove Insert/Update builder in Cassandra connector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa1ec6f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa1ec6f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa1ec6f7
Branch: refs/heads/master
Commit: fa1ec6f7e2efb1b3dc85b67fd7341d5583f2ad1d
Parents: df32755 0702f54
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 11 14:19:45 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 11 14:19:45 2016 -0600
----------------------------------------------------------------------
external/storm-cassandra/README.md | 70 ++++++---
.../cassandra/DynamicStatementBuilder.java | 99 ++++--------
.../storm/cassandra/Murmur3StreamGrouping.java | 2 -
.../query/BaseCQLStatementTupleMapper.java | 51 +++++++
.../query/BatchStatementTupleMapper.java | 57 -------
.../cassandra/query/CQLClauseTupleMapper.java | 36 -----
.../cassandra/query/CQLStatementBuilder.java | 4 +-
.../query/CQLStatementTupleMapper.java | 25 ---
.../cassandra/query/CQLTableTupleMapper.java | 39 -----
.../cassandra/query/CQLValuesTupleMapper.java | 74 ---------
.../apache/storm/cassandra/query/Column.java | 74 +++++++++
.../apache/storm/cassandra/query/CqlMapper.java | 86 +++++++++++
.../query/SimpleCQLStatementTupleMapper.java | 51 -------
.../builder/BoundCQLStatementMapperBuilder.java | 106 +++++++++++++
.../SimpleCQLStatementMapperBuilder.java | 92 +++++++++++
.../impl/BatchCQLStatementTupleMapper.java | 58 +++++++
.../impl/BoundCQLStatementTupleMapper.java | 106 +++++++++++++
.../query/impl/BoundStatementMapperBuilder.java | 109 -------------
.../query/impl/InsertStatementBuilder.java | 153 -------------------
.../query/impl/PreparedStatementBinder.java | 143 +++++++++++++++++
.../query/impl/RoutingKeyGenerator.java | 52 +++++++
.../query/impl/SimpleCQLStatementMapper.java | 88 +++++++++++
.../query/impl/UpdateStatementBuilder.java | 118 --------------
.../cassandra/query/selector/FieldSelector.java | 8 +-
.../cassandra/DynamicStatementBuilderTest.java | 120 +++++++++------
.../bolt/BatchCassandraWriterBoltTest.java | 10 +-
.../cassandra/bolt/CassandraWriterBoltTest.java | 10 +-
.../cassandra/trident/TridentTopologyTest.java | 6 +-
28 files changed, 1020 insertions(+), 827 deletions(-)
----------------------------------------------------------------------
[2/6] storm git commit: STORM-1348 - refactor API to remove
Insert/Update builder in Cassandra connector
Posted by bo...@apache.org.
STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2997cf9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2997cf9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2997cf9
Branch: refs/heads/master
Commit: c2997cf9f116297812505451269568484196b4c0
Parents: 6d7b968
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Sun Dec 6 22:23:29 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:19 2015 +0100
----------------------------------------------------------------------
external/storm-cassandra/README.md | 70 ++++++---
.../cassandra/DynamicStatementBuilder.java | 90 +++--------
.../query/BaseCQLStatementTupleMapper.java | 51 +++++++
.../query/BatchStatementTupleMapper.java | 57 -------
.../cassandra/query/CQLClauseTupleMapper.java | 36 -----
.../cassandra/query/CQLStatementBuilder.java | 4 +-
.../query/CQLStatementTupleMapper.java | 19 ---
.../cassandra/query/CQLTableTupleMapper.java | 39 -----
.../cassandra/query/CQLValuesTupleMapper.java | 74 ---------
.../apache/storm/cassandra/query/Column.java | 74 +++++++++
.../apache/storm/cassandra/query/CqlMapper.java | 86 +++++++++++
.../query/SimpleCQLStatementTupleMapper.java | 51 -------
.../builder/BoundCQLStatementMapperBuilder.java | 103 +++++++++++++
.../SimpleCQLStatementMapperBuilder.java | 91 +++++++++++
.../impl/BatchCQLStatementTupleMapper.java | 58 +++++++
.../impl/BoundCQLStatementTupleMapper.java | 106 +++++++++++++
.../query/impl/BoundStatementMapperBuilder.java | 109 -------------
.../query/impl/InsertStatementBuilder.java | 153 -------------------
.../query/impl/PreparedStatementBinder.java | 136 +++++++++++++++++
.../query/impl/RoutingKeyGenerator.java | 52 +++++++
.../query/impl/SimpleCQLStatementMapper.java | 88 +++++++++++
.../query/impl/UpdateStatementBuilder.java | 118 --------------
.../cassandra/query/selector/FieldSelector.java | 6 +-
.../cassandra/DynamicStatementBuilderTest.java | 108 +++++++------
.../bolt/BatchCassandraWriterBoltTest.java | 10 +-
.../cassandra/bolt/CassandraWriterBoltTest.java | 10 +-
26 files changed, 991 insertions(+), 808 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 943fe5e..1edf708 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -39,17 +39,24 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
```java
new CassandraWriterBolt(
- insertInto("album")
- .values(
- with(fields("title", "year", "performer", "genre", "tracks")
- ).build());
+ async(
+ simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with(
+ fields("title", "year", "performer", "genre", "tracks")
+ )
+ )
+ );
```
+
##### Insert query including all tuple fields.
```java
new CassandraWriterBolt(
- insertInto("album")
- .values(all()).build());
+ async(
+ simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with( all() )
+ )
+ );
```
##### Insert multiple queries from one input tuple.
@@ -57,32 +64,49 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
new CassandraWriterBolt(
async(
- insertInto("titles_per_album").values(all()),
- insertInto("titles_per_performer").values(all())
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
```
-##### Insert query including some fields with aliases
+##### Insert query using QueryBuilder
```java
new CassandraWriterBolt(
- insertInto("album")
- .values(
- with(field("ti"),as("title",
- field("ye").as("year")),
- field("pe").as("performer")),
- field("ge").as("genre")),
- field("tr").as("tracks")),
- ).build());
+ async(
+ simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with(all()))
+ )
+ )
```
##### Insert query with static bound query
```java
new CassandraWriterBolt(
- boundQuery("INSERT INTO album (\"title\", \"year\", \"performer\", \"genre\", \"tracks\") VALUES(?, ?, ?, ?, ?);")
- .bind(all());
+ async(
+ boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .bind(all());
+ )
+ );
+```
+
+##### Insert query with static bound query using named setters and aliases
+```java
+
+ new CassandraWriterBolt(
+ async(
+ boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
+ .bind(
+ field("ti"),as("title"),
+ field("ye").as("year")),
+ field("pe").as("performer")),
+ field("ge").as("genre")),
+ field("tr").as("tracks"))
+ ).byNamedSetters()
+ )
+ );
```
##### Insert query with bound statement load from storm configuration
@@ -106,14 +130,14 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
// Logged
new CassandraWriterBolt(loggedBatch(
- insertInto("title_per_album").values(all())
- insertInto("title_per_performer").values(all())
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
// UnLogged
new CassandraWriterBolt(unLoggedBatch(
- insertInto("title_per_album").values(all())
- insertInto("title_per_performer").values(all())
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
```
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
index deea8da..ce53ea3 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -19,10 +19,11 @@
package org.apache.storm.cassandra;
import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
import org.apache.storm.cassandra.query.*;
-import org.apache.storm.cassandra.query.impl.BoundStatementMapperBuilder;
-import org.apache.storm.cassandra.query.impl.InsertStatementBuilder;
-import org.apache.storm.cassandra.query.impl.UpdateStatementBuilder;
+import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder;
+import org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder;
import org.apache.storm.cassandra.query.selector.FieldSelector;
import java.io.Serializable;
@@ -33,73 +34,32 @@ public class DynamicStatementBuilder implements Serializable {
private DynamicStatementBuilder() {
}
- /**
- * Builds a new insert statement for the specified table.
- *
- * @param table the table's name.
- * @return a new {@link InsertStatementBuilder} instance.
- */
- public static final InsertStatementBuilder insertInto(String table) {
- return new InsertStatementBuilder(table);
- }
- /**
- * Builds a new insert statement based on the specified CQL mapper.
- *
- * @param mapper the CQL mapper.
- * @return a new {@link InsertStatementBuilder} instance.
- */
- public static final InsertStatementBuilder insertInto(CQLTableTupleMapper mapper) {
- return new InsertStatementBuilder(mapper);
- }
- /**
- * Builds a new insert statement for the specified keyspace and table.
- *
- * @param ks the keyspace to use.
- * @param table the table's name.
- * @return a new {@link InsertStatementBuilder} instance.
- */
- public static final InsertStatementBuilder insertInto(String ks, String table) {
- return new InsertStatementBuilder(table, ks);
+ public static final SimpleCQLStatementMapperBuilder simpleQuery(String queryString) {
+ return new SimpleCQLStatementMapperBuilder(queryString);
}
- /**
- * Builds a new update statement for the specified table.
- *
- * @param table the table's name.
- * @return a new {@link UpdateStatementBuilder} instance.
- */
- public static final UpdateStatementBuilder update(String table) {
- return new UpdateStatementBuilder(table);
- }
-
- /**
- * Builds a new update statement for the specified keyspace and table.
- *
- * @param table the table's name.
- * @return a new {@link UpdateStatementBuilder} instance.
- */
- public static final UpdateStatementBuilder update(String ks, String table) {
- return new UpdateStatementBuilder(table, ks);
+ public static final SimpleCQLStatementMapperBuilder simpleQuery(BuiltStatement builtStatement) {
+ return new SimpleCQLStatementMapperBuilder(builtStatement);
}
/**
* Builds a new bound statement based on the specified query.
*
* @param cql the query.
- * @return a new {@link BoundStatementMapperBuilder} instance.
+ * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
*/
- public static final BoundStatementMapperBuilder boundQuery(String cql) {
- return new BoundStatementMapperBuilder(cql);
+ public static final BoundCQLStatementMapperBuilder boundQuery(String cql) {
+ return new BoundCQLStatementMapperBuilder(cql);
}
/**
* Builds a new bound statement identified by the given field.
*
* @param field a context used to resolve the cassandra query.
- * @return a new {@link BoundStatementMapperBuilder} instance.
+ * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
*/
- public static final BoundStatementMapperBuilder boundQuery(ContextQuery field) {
- return new BoundStatementMapperBuilder(field);
+ public static final BoundCQLStatementMapperBuilder boundQuery(ContextQuery field) {
+ return new BoundCQLStatementMapperBuilder(field);
}
/**
@@ -115,27 +75,27 @@ public class DynamicStatementBuilder implements Serializable {
/**
* Creates a new {@link com.datastax.driver.core.BatchStatement.Type#LOGGED} batch statement for the specified CQL statement builders.
*/
- public static final BatchStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
+ public static final BatchCQLStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders);
}
/**
* Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders.
*/
- public static final BatchStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
+ public static final BatchCQLStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders);
}
/**
* Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders.
*/
- public static final BatchStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
+ public static final BatchCQLStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
return newBatchStatementBuilder(BatchStatement.Type.UNLOGGED, builders);
}
- private static BatchStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
+ private static BatchCQLStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length);
for(CQLStatementBuilder b : Arrays.asList(builders))
mappers.add(b.build());
- return new BatchStatementTupleMapper(type, mappers);
+ return new BatchCQLStatementTupleMapper(type, mappers);
}
/**
@@ -181,19 +141,11 @@ public class DynamicStatementBuilder implements Serializable {
return fl.toArray(new FieldSelector[size]);
}
- /**
- * Includes only the specified tuple fields.
- *
- * @param fields a list of field selector.
- */
- public static final CQLValuesTupleMapper with(final FieldSelector... fields) {
- return new CQLValuesTupleMapper.WithFieldTupleMapper(Arrays.asList(fields));
- }
/**
* Includes all tuple fields.
*/
- public static final CQLValuesTupleMapper all() {
- return new CQLValuesTupleMapper.AllTupleMapper();
+ public static final CqlMapper.DefaultCqlMapper all() {
+ return new CqlMapper.DefaultCqlMapper();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
new file mode 100644
index 0000000..c9ba6fa
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
+ *
+ */
+public abstract class BaseCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Statement> map(Map conf, Session session, ITuple tuple) {
+ return Arrays.asList(map(tuple));
+ }
+
+ /**
+ * Maps a given tuple to a single CQL statements.
+ *
+ * @param tuple the incoming tuple to map.
+ * @return a list of {@link com.datastax.driver.core.Statement}.
+ */
+ public abstract Statement map(ITuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
deleted file mode 100644
index 32ff1de..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-
-public class BatchStatementTupleMapper implements CQLStatementTupleMapper {
-
- private final List<CQLStatementTupleMapper> mappers;
- private final BatchStatement.Type type;
-
- /**
- * Creates a new {@link BatchStatementTupleMapper} instance.
- * @param type
- * @param mappers
- */
- public BatchStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
- this.mappers = new ArrayList<>(mappers);
- this.type = type;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Statement> map(Map conf, Session session, ITuple tuple) {
- final BatchStatement batch = new BatchStatement(this.type);
- for(CQLStatementTupleMapper m : mappers)
- batch.addAll(m.map(conf, session, tuple));
- return Arrays.asList((Statement)batch);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
deleted file mode 100644
index baa4685..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.querybuilder.Clause;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to
- * a list of cassandra {@link com.datastax.driver.core.querybuilder.Clause}s.
- *
- */
-public interface CQLClauseTupleMapper extends Serializable {
-
- List<Clause> map(ITuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
index 6c04d1c..9ddb58a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
@@ -21,11 +21,11 @@ package org.apache.storm.cassandra.query;
import java.io.Serializable;
-public interface CQLStatementBuilder extends Serializable {
+public interface CQLStatementBuilder<T extends CQLStatementTupleMapper> extends Serializable {
/**
* Builds a new {@link CQLStatementTupleMapper} instance.
* @return a new CQLStatementMapper instance.
*/
- <T extends CQLStatementTupleMapper> T build();
+ T build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
index 32c86dd..b100d9a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
@@ -38,10 +38,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
*/
public interface CQLStatementTupleMapper extends Serializable {
- public static final String FIELD_KEYSPACE = "keyspace";
- public static final String FIELD_TABLE = "table";
- public static final String FIELD_VALUES = "value";
-
/**
* Maps a given tuple to one or multiple CQL statements.
*
@@ -52,21 +48,6 @@ public interface CQLStatementTupleMapper extends Serializable {
*/
List<Statement> map(Map conf, Session session, ITuple tuple);
- public static class InsertCQLStatementTupleMapper implements CQLStatementTupleMapper {
- @Override
- public List<Statement> map(Map conf, Session session, ITuple tuple) {
- Optional<String> ks = Optional.fromNullable(tuple.contains(FIELD_KEYSPACE) ? tuple.getStringByField(FIELD_KEYSPACE) : null);
- String table = tuple.getStringByField(FIELD_TABLE);
- Map<String, Object> values = (Map<String, Object>) tuple.getValueByField(FIELD_VALUES);
-
- final Insert stmt = (ks.isPresent()) ? insertInto(ks.get(), table) : insertInto(table);
- for(Map.Entry<String, Object> v : values.entrySet())
- stmt.value(v.getKey(), v.getValue());
-
- return Arrays.asList((Statement)stmt);
- }
- }
-
public static class DynamicCQLStatementTupleMapper implements CQLStatementTupleMapper {
private List<CQLStatementBuilder> builders;
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
deleted file mode 100644
index 574eac9..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-
-import java.io.Serializable;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to a table name.
- *
- */
-public interface CQLTableTupleMapper extends Serializable {
-
- /**
- * Returns a table's name from the specified tuple.
- *
- * @param tuple the incoming tuple.
- * @return the table name.
- */
- String map(ITuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
deleted file mode 100644
index 2cb8e4c..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import org.apache.storm.cassandra.query.selector.FieldSelector;
-
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to Map of values of a CQL statement.
- *
- */
-public interface CQLValuesTupleMapper extends Serializable {
-
- /**
- * Map the specified {@code tuple} to values of CQL statement(s).
- * @param tuple the incoming tuple to map.
- * @return values of CQL statement(s)
- */
- Map<String, Object> map(ITuple tuple);
-
- /**
- * Default {@link CQLValuesTupleMapper} implementation to get specifics tuple's fields.
- */
- public static class WithFieldTupleMapper implements CQLValuesTupleMapper {
- private List<FieldSelector> fields;
-
- public WithFieldTupleMapper(List<FieldSelector> fields) {
- this.fields = fields;
- }
-
- @Override
- public Map<String, Object> map(ITuple tuple) {
- Map<String, Object> ret = new LinkedHashMap<>();
- for(FieldSelector fs : fields)
- fs.selectAndPut(tuple, ret);
- return ret;
- }
- }
-
- /**
- * Default {@link CQLValuesTupleMapper} implementation to get all tuple's fields.
- */
- public static class AllTupleMapper implements CQLValuesTupleMapper {
- @Override
- public Map<String, Object> map(ITuple tuple) {
- Map<String, Object> ret = new LinkedHashMap<>();
- for(String name : tuple.getFields())
- ret.put(name, tuple.getValueByField(name));
- return ret;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
new file mode 100644
index 0000000..06643a5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class Column<T> implements Serializable {
+
+ private final String columnName;
+ private final T val;
+
+ public Column(String columnName, T val) {
+ this.columnName = columnName;
+ this.val = val;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public T getVal() {
+ return val;
+ }
+
+ public boolean isNull() { return val == null;}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Column column = (Column) o;
+
+ if (columnName != null ? !columnName.equals(column.columnName) : column.columnName != null) return false;
+ if (val != null ? !val.equals(column.val) : column.val != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = columnName != null ? columnName.hashCode() : 0;
+ result = 31 * result + (val != null ? val.hashCode() : 0);
+ return result;
+ }
+
+ public static Object[] getVals(List<Column> columns) {
+ List<Object> vals = new ArrayList<>(columns.size());
+ for(Column c : columns)
+ vals.add(c.getVal());
+ return vals.toArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
new file mode 100644
index 0000000..2ab8f92
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Default interface to defines how a storm tuple maps to a list of columns representing a row in a database.
+ */
+public interface CqlMapper extends Serializable {
+
+ /**
+ * Maps the specified input tuple to a list of CQL columns.
+ * @param tuple the input tuple.
+ * @return the list of
+ */
+ List<Column> map(ITuple tuple);
+
+
+ public static final class SelectableCqlMapper implements CqlMapper {
+
+ private final List<FieldSelector> selectors;
+
+ /**
+ * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+ * @param selectors list of selectors used to extract column values from tuple.
+ */
+ public SelectableCqlMapper(List<FieldSelector> selectors) {
+ this.selectors = selectors;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Column> map(ITuple tuple) {
+ List<Column> columns = new ArrayList<>(selectors.size());
+ for(FieldSelector selector : selectors)
+ columns.add(selector.select(tuple));
+ return columns;
+ }
+ }
+
+ /**
+ * Default {@link CqlMapper} to map all tuple values to column.
+ */
+ public static final class DefaultCqlMapper implements CqlMapper {
+
+ /**
+ * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+ */
+ public DefaultCqlMapper() {}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Column> map(ITuple tuple) {
+ List<Column> columns = new ArrayList<>(tuple.size());
+ for(String name : tuple.getFields())
+ columns.add(new Column(name, tuple.getValueByField(name)));
+ return columns;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
deleted file mode 100644
index 683824e..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
- *
- */
-public abstract class SimpleCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Statement> map(Map conf, Session session, ITuple tuple) {
- return Arrays.asList(map(tuple));
- }
-
- /**
- * Maps a given tuple to a single CQL statements.
- *
- * @param tuple the incoming tuple to map.
- * @return a list of {@link com.datastax.driver.core.Statement}.
- */
- public abstract Statement map(ITuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..52502b0
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.BoundCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.PreparedStatementBinder;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.*;
+
+import static org.apache.storm.cassandra.query.ContextQuery.*;
+
+public class BoundCQLStatementMapperBuilder implements CQLStatementBuilder<BoundCQLStatementTupleMapper>, Serializable {
+
+ private final ContextQuery contextQuery;
+
+ private CqlMapper mapper;
+
+ private List<String> routingKeys;
+
+ private PreparedStatementBinder binder;
+
+ /**
+ * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+ * @param cql
+ */
+ public BoundCQLStatementMapperBuilder(String cql) {
+ this.contextQuery = new StaticContextQuery(cql);
+ }
+
+ /**
+ * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+ * @param contextQuery
+ */
+ public BoundCQLStatementMapperBuilder(ContextQuery contextQuery) {
+ this.contextQuery = contextQuery;
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param fields a list of field selector.
+ */
+ public final BoundCQLStatementMapperBuilder bind(final FieldSelector... fields) {
+ this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+ return this;
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param mapper a column mapper.
+ */
+ public final BoundCQLStatementMapperBuilder bind(CqlMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public final BoundCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+ this.routingKeys = Arrays.asList(fields);
+ return this;
+ }
+
+ public final BoundCQLStatementMapperBuilder byNamedSetters() {
+ this.binder = new PreparedStatementBinder.CQL3NamedSettersBinder();
+ return this;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BoundCQLStatementTupleMapper build() {
+ return new BoundCQLStatementTupleMapper(contextQuery, mapper, getRoutingKeyGenerator(), getStatementBinder());
+ }
+
+ private RoutingKeyGenerator getRoutingKeyGenerator() {
+ return (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null;
+ }
+
+ private PreparedStatementBinder getStatementBinder() {
+ return (binder != null) ? binder : new PreparedStatementBinder.DefaultBinder();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..401f1bf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Default class to build {@link org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper} instance.
+ */
+public class SimpleCQLStatementMapperBuilder implements CQLStatementBuilder<SimpleCQLStatementMapper>, Serializable {
+
+ private final String queryString;
+
+ private CqlMapper mapper;
+
+ private List<String> routingKeys;
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+ * @param queryString a valid CQL query string.
+ */
+ public SimpleCQLStatementMapperBuilder(String queryString) {
+ this.queryString = queryString;
+ }
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+ * @param builtStatement a query built statements
+ */
+ public SimpleCQLStatementMapperBuilder(BuiltStatement builtStatement) {
+ this.queryString = builtStatement.getQueryString();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SimpleCQLStatementMapper build() {
+ return new SimpleCQLStatementMapper(queryString, mapper, (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null );
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param fields a list of field selector.
+ */
+ public final SimpleCQLStatementMapperBuilder with(final FieldSelector... fields) {
+ this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+ return this;
+ }
+
+ public final SimpleCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+ this.routingKeys = Arrays.asList(fields);
+ return this;
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param mapper a column mapper.
+ */
+ public final SimpleCQLStatementMapperBuilder with(CqlMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
new file mode 100644
index 0000000..fe948f5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+public class BatchCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+ private final List<CQLStatementTupleMapper> mappers;
+ private final BatchStatement.Type type;
+
+ /**
+ * Creates a new {@link BatchCQLStatementTupleMapper} instance.
+ * @param type
+ * @param mappers
+ */
+ public BatchCQLStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
+ this.mappers = new ArrayList<>(mappers);
+ this.type = type;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Statement> map(Map conf, Session session, ITuple tuple) {
+ final BatchStatement batch = new BatchStatement(this.type);
+ for(CQLStatementTupleMapper m : mappers)
+ batch.addAll(m.map(conf, session, tuple));
+ return Arrays.asList((Statement)batch);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
new file mode 100644
index 0000000..8cce418
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+import org.apache.storm.cassandra.query.ContextQuery;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+ private final ContextQuery contextQuery;
+
+ private final CqlMapper mapper;
+
+ private Map<String, PreparedStatement> cache = new HashMap<>();
+
+ private final RoutingKeyGenerator rkGenerator;
+
+ private final PreparedStatementBinder binder;
+
+ /**
+ * Creates a new {@link BoundCQLStatementTupleMapper} instance.
+ *
+ * @param contextQuery
+ * @param mapper
+ * @param mapper
+ * @param rkGenerator
+ * @param binder
+ */
+ public BoundCQLStatementTupleMapper(ContextQuery contextQuery, CqlMapper mapper, RoutingKeyGenerator rkGenerator, PreparedStatementBinder binder) {
+ Preconditions.checkNotNull(contextQuery, "ContextQuery must not be null");
+ Preconditions.checkNotNull(mapper, "Mapper should not be null");
+ this.contextQuery = contextQuery;
+ this.mapper = mapper;
+ this.rkGenerator = rkGenerator;
+ this.binder = binder;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Statement> map(Map config, Session session, ITuple tuple) {
+ final List<Column> columns = mapper.map(tuple);
+
+ final String query = contextQuery.resolves(config, tuple);
+
+ PreparedStatement statement = getPreparedStatement(session, query);
+ if(hasRoutingKeys()) {
+ List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+ if( keys.size() == 1)
+ statement.setRoutingKey(keys.get(0));
+ else
+ statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+ }
+
+ return Arrays.asList((Statement) this.binder.apply(statement, columns));
+ }
+
+ private boolean hasRoutingKeys() {
+ return rkGenerator != null;
+ }
+
+ /**
+ * Get or prepare a statement using the specified session and the query.
+ * *
+ * @param session The cassandra session.
+ * @param query The CQL query to prepare.
+ */
+ private PreparedStatement getPreparedStatement(Session session, String query) {
+ PreparedStatement statement = cache.get(query);
+ if( statement == null) {
+ statement = session.prepare(query);
+ cache.put(query, statement);
+ }
+ return statement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
deleted file mode 100644
index b7ded14..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.ContextQuery;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery;
-
-
-public class BoundStatementMapperBuilder implements Serializable {
- private final ContextQuery contextQuery;
-
- /**
- * Creates a new {@link BoundStatementMapperBuilder} instance.
- * @param cql
- */
- public BoundStatementMapperBuilder(String cql) {
- this.contextQuery = new StaticContextQuery(cql);
- }
-
- /**
- * Creates a new {@link BoundStatementMapperBuilder} instance.
- * @param contextQuery
- */
- public BoundStatementMapperBuilder(ContextQuery contextQuery) {
- this.contextQuery = contextQuery;
- }
-
- public CQLStatementTupleMapper bind(final CQLValuesTupleMapper mapper) {
- return new CQLBoundStatementTupleMapper(contextQuery, mapper);
- }
-
- public static class CQLBoundStatementTupleMapper implements CQLStatementTupleMapper {
-
- private final ContextQuery contextQuery;
-
- private final CQLValuesTupleMapper mapper;
-
- // should be a lru-map or weakhashmap else this may lead to memory leaks.
- private Map<String, PreparedStatement> cache = new HashMap<>();
-
- /**
- * Creates a new {@link CQLBoundStatementTupleMapper} instance.
- *
- * @param contextQuery
- * @param mapper
- */
- CQLBoundStatementTupleMapper(ContextQuery contextQuery, CQLValuesTupleMapper mapper) {
- this.contextQuery = contextQuery;
- this.mapper = mapper;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Statement> map(Map config, Session session, ITuple tuple) {
- final Map<String, Object> values = mapper.map(tuple);
- final String query = contextQuery.resolves(config, tuple);
- Object[] objects = values.values().toArray(new Object[values.size()]);
- PreparedStatement statement = getPreparedStatement(session, query);
- // todo bind objects in the same sequence as the statement expects.
- return Arrays.asList((Statement) statement.bind(objects));
- }
-
- /**
- * Get or prepare a statement using the specified session and the query.
- * *
- * @param session The cassandra session.
- * @param query The CQL query to prepare.
- */
- private PreparedStatement getPreparedStatement(Session session, String query) {
- PreparedStatement statement = cache.get(query);
- if (statement == null) {
- statement = session.prepare(query);
- cache.put(query, statement);
- }
- return statement;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
deleted file mode 100644
index 8eddb04..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.base.Preconditions;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLTableTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class InsertStatementBuilder<T extends CQLValuesTupleMapper> implements CQLStatementBuilder {
- /**
- * Optional name of the table into insert.
- */
- protected final String table;
- /**
- * Optional keyspace name to insert.
- */
- protected final String keyspace;
- /**
- * Optional mapper to retrieve a table name from tuple.
- */
- protected final CQLTableTupleMapper mapper2TableName;
-
- protected T valuesMapper;
-
- protected boolean ifNotExist;
-
- protected Integer ttlInSeconds;
-
- protected ConsistencyLevel level;
-
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param table name of the table into insert.
- */
- public InsertStatementBuilder(String table) {
- this(null, table, null);
- }
-
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param table name of the table into insert.
- * @param keyspace the keyspace's name to which the table belongs
- */
- public InsertStatementBuilder(String table, String keyspace) {
- this(keyspace, table, null);
- }
-
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param mapper2TableName the mapper that specify the table in which insert
- * @param keyspace the name of the keyspace to which the table belongs
- */
- public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName, String keyspace) {
- this(keyspace, null, mapper2TableName);
- }
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param mapper2TableName the mapper that specify the table in which insert
- */
- public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName) {
- this(null, null, mapper2TableName);
- }
-
- private InsertStatementBuilder(String keyspace, String table, CQLTableTupleMapper mapper2TableName) {
- this.keyspace = keyspace;
- this.table = table;
- this.mapper2TableName = mapper2TableName;
- }
-
- /**
- * Adds "IF NOT EXISTS" clause to the statement.
- * @see com.datastax.driver.core.querybuilder.Insert#ifNotExists()
- */
- public InsertStatementBuilder ifNotExists() {
- this.ifNotExist = true;
- return this;
- }
-
- public InsertStatementBuilder usingTTL(Long time, TimeUnit unit) {
- this.ttlInSeconds = (int) unit.toSeconds(time);
- return this;
- }
-
- /**
- * Sets the consistency level used for this statement.
- *
- * @param level a ConsistencyLevel.
- */
- public InsertStatementBuilder withConsistencyLevel(ConsistencyLevel level) {
- this.level = level;
- return this;
- }
-
- /**
- * Maps tuple to values.
- *
- * @see com.datastax.driver.core.querybuilder.Insert#value(String, Object)
- */
- public InsertStatementBuilder values(final T valuesMapper) {
- this.valuesMapper = valuesMapper;
- return this;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public SimpleCQLStatementTupleMapper build() {
- Preconditions.checkState(null != table, "Table name must not be null");
- return new SimpleCQLStatementTupleMapper() {
- @Override
- public Statement map(ITuple tuple) {
- Insert stmt = QueryBuilder.insertInto(keyspace, table);
- for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
- stmt.value(entry.getKey(), entry.getValue());
- if (ttlInSeconds != null) stmt.using(QueryBuilder.ttl(ttlInSeconds));
- if (ifNotExist) stmt.ifNotExists();
- if (level != null) stmt.setConsistencyLevel(level);
- return stmt;
- }
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
new file mode 100644
index 0000000..7a6c78d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import com.datastax.driver.core.*;
+import org.apache.storm.cassandra.query.Column;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ *
+ */
+public interface PreparedStatementBinder extends Serializable {
+
+ public BoundStatement apply(PreparedStatement statement, List<Column> columns);
+
+ public static final class DefaultBinder implements PreparedStatementBinder {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+ Object[] values = Column.getVals(columns);
+ return statement.bind(values);
+ }
+ }
+
+ public static final class CQL3NamedSettersBinder implements PreparedStatementBinder {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+ Object[] values = Column.getVals(columns);
+
+ BoundStatement boundStatement = statement.bind();
+ for(Column col : columns) {
+ // For native protocol V3 or below, all variables must be bound.
+ // With native protocol V4 or above, variables can be left unset,
+ // in which case they will be ignored server side (no tombstones will be generated).
+ if(col.isNull()) {
+ boundStatement.setToNull(col.getColumnName());
+ } else {
+ bind(boundStatement, col.getColumnName(), col.getVal());
+ }
+ }
+ return statement.bind(values);
+ }
+
+ /**
+ * This ugly method comes from {@link com.datastax.driver.core.TypeCodec#getDataTypeFor(Object)}.
+ */
+ static void bind(BoundStatement statement, String name, Object value) {
+ // Starts with ByteBuffer, so that if already serialized value are provided, we don't have the
+ // cost of testing a bunch of other types first
+ if (value instanceof ByteBuffer)
+ statement.setBytes(name, (ByteBuffer)value);
+
+ if (value instanceof Number) {
+ if (value instanceof Integer)
+ statement.setInt(name, (Integer)value);
+ if (value instanceof Long)
+ statement.setLong(name, (Long) value);
+ if (value instanceof Float)
+ statement.setFloat(name, (Float) value);
+ if (value instanceof Double)
+ statement.setDouble(name, (Double)value);
+ if (value instanceof BigDecimal)
+ statement.setDecimal(name, (BigDecimal)value);
+ if (value instanceof BigInteger)
+ statement.setVarint(name, (BigInteger)value);
+ throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+ }
+
+ if (value instanceof String)
+ statement.setString(name, (String)value);
+
+ if (value instanceof Boolean)
+ statement.setBool(name, (Boolean)value);
+
+ if (value instanceof InetAddress)
+ statement.setInet(name, (InetAddress)value);
+
+ if (value instanceof Date)
+ statement.setDate(name, (Date)value);
+
+ if (value instanceof UUID)
+ statement.setUUID(name, (UUID)value);
+
+ if (value instanceof List) {
+ statement.setList(name, (List)value);
+ }
+
+ if (value instanceof Set) {
+ statement.setSet(name, (Set)value);
+ }
+
+ if (value instanceof Map) {
+ statement.setMap(name, (Map) value);
+ }
+
+ if (value instanceof UDTValue) {
+ statement.setUDTValue(name, (UDTValue)value);
+ }
+
+ if (value instanceof TupleValue) {
+ statement.setTupleValue(name, (TupleValue) value);
+ }
+
+ throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
new file mode 100644
index 0000000..3f4f47e
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RoutingKeyGenerator implements Serializable {
+
+ private List<String> routingKeys;
+
+ /**
+ * Creates a new {@link RoutingKeyGenerator} instance.
+ * @param routingKeys
+ */
+ public RoutingKeyGenerator(List<String> routingKeys) {
+ Preconditions.checkNotNull(routingKeys);
+ this.routingKeys = routingKeys;
+ }
+
+ public List<ByteBuffer> getRoutingKeys(ITuple tuple) {
+ List<ByteBuffer> keys = new ArrayList<>(routingKeys.size());
+ for(String s : routingKeys) {
+ Object value = tuple.getValueByField(s);
+ keys.add(DataType.serializeValue(value, ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ return keys;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
new file mode 100644
index 0000000..ab9adbf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class SimpleCQLStatementMapper implements CQLStatementTupleMapper {
+
+ private final String queryString;
+ private final CqlMapper mapper;
+ private final RoutingKeyGenerator rkGenerator;
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapper} instance.
+ * @param queryString the cql query string to execute.
+ * @param mapper the mapper.
+ */
+ public SimpleCQLStatementMapper(String queryString, CqlMapper mapper) {
+ this(queryString, mapper, null);
+ }
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapper} instance.
+ * @param queryString the cql query string to execute.
+ * @param mapper the mapper.
+ */
+ public SimpleCQLStatementMapper(String queryString, CqlMapper mapper, RoutingKeyGenerator rkGenerator) {
+ Preconditions.checkNotNull(queryString, "Query string must not be null");
+ Preconditions.checkNotNull(mapper, "Mapper should not be null");
+ this.queryString = queryString;
+ this.mapper = mapper;
+ this.rkGenerator = rkGenerator;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<Statement> map(Map conf, Session session, ITuple tuple) {
+ List<Column> columns = mapper.map(tuple);
+ SimpleStatement statement = new SimpleStatement(queryString, Column.getVals(columns));
+
+ if(hasRoutingKeys()) {
+ List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+ if( keys.size() == 1)
+ statement.setRoutingKey(keys.get(0));
+ else
+ statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+ }
+
+ return Arrays.asList((Statement) statement);
+ }
+
+ private boolean hasRoutingKeys() {
+ return rkGenerator != null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
deleted file mode 100644
index 70696ab..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Clause;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Update;
-import org.apache.storm.cassandra.query.CQLClauseTupleMapper;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-
-import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
-
-public final class UpdateStatementBuilder implements CQLStatementBuilder {
-
- /**
- * The name of table to update.
- */
- private final String table;
- /**
- * The keyspace of the table.
- */
- private final String keyspace;
-
- private CQLValuesTupleMapper valuesMapper;
-
- private CQLClauseTupleMapper clausesMapper;
-
- private CQLClauseTupleMapper onlyIfClausesMapper;
- /**
- * Creates a new {@link UpdateStatementBuilder} instance.
- * @param table the name of table to update.
- */
- public UpdateStatementBuilder(String table) {
- this(table, null);
- }
-
- /**
- * Creates a new {@link UpdateStatementBuilder} instance.
- * @param table the name of table to update.
- * @param keyspace the keyspace of the table.
- */
- public UpdateStatementBuilder(String table, String keyspace) {
- this.table = table;
- this.keyspace = keyspace;
- }
-
- /**
- * Maps output tuple to values.
- * @see com.datastax.driver.core.querybuilder.Update#with(com.datastax.driver.core.querybuilder.Assignment)
- */
- public UpdateStatementBuilder with(final CQLValuesTupleMapper values) {
- this.valuesMapper = values;
- return this;
- }
-
- /**
- * Maps output tuple to some Where clauses.
- * @see com.datastax.driver.core.querybuilder.Update#where(com.datastax.driver.core.querybuilder.Clause)
- */
- public UpdateStatementBuilder where(final CQLClauseTupleMapper clauses) {
- this.clausesMapper = clauses;
- return this;
- }
-
- /**
- * Maps output tuple to some If clauses.
- * @see com.datastax.driver.core.querybuilder.Update#onlyIf(com.datastax.driver.core.querybuilder.Clause)
- */
- public UpdateStatementBuilder onlyIf(final CQLClauseTupleMapper clauses) {
- this.onlyIfClausesMapper = clauses;
- return this;
- }
-
- @Override
- public SimpleCQLStatementTupleMapper build() {
- return new SimpleCQLStatementTupleMapper() {
- @Override
- public Statement map(ITuple tuple) {
- Update stmt = QueryBuilder.update(keyspace, table);
- for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
- stmt.with(set(entry.getKey(), entry.getValue()));
- for(Clause clause : clausesMapper.map(tuple))
- stmt.where(clause);
- if( hasIfClauses())
- for(Clause clause : onlyIfClausesMapper.map(tuple)) {
- stmt.onlyIf(clause);
- }
- return stmt;
- }
- };
- }
-
- private boolean hasIfClauses() {
- return onlyIfClausesMapper != null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
index b68138a..8ce651e 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
@@ -20,6 +20,7 @@ package org.apache.storm.cassandra.query.selector;
import backtype.storm.tuple.ITuple;
import com.datastax.driver.core.utils.UUIDs;
+import org.apache.storm.cassandra.query.Column;
import java.io.Serializable;
import java.util.Map;
@@ -41,9 +42,8 @@ public class FieldSelector implements Serializable {
this.field = field;
}
- public void selectAndPut(ITuple t, Map<String, Object> values) {
- values.put(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
-
+ public Column select(ITuple t) {
+ return new Column<>(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
}
/**