You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:31:06 UTC
[2/6] storm git commit: STORM-1348 - refactor API to remove
Insert/Update builder in Cassandra connector
STORM-1348 - refactor API to remove Insert/Update builder in Cassandra connector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c2997cf9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c2997cf9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c2997cf9
Branch: refs/heads/master
Commit: c2997cf9f116297812505451269568484196b4c0
Parents: 6d7b968
Author: Florian Hussonnois <fl...@gmail.com>
Authored: Sun Dec 6 22:23:29 2015 +0100
Committer: Florian Hussonnois <fl...@gmail.com>
Committed: Mon Dec 14 11:57:19 2015 +0100
----------------------------------------------------------------------
external/storm-cassandra/README.md | 70 ++++++---
.../cassandra/DynamicStatementBuilder.java | 90 +++--------
.../query/BaseCQLStatementTupleMapper.java | 51 +++++++
.../query/BatchStatementTupleMapper.java | 57 -------
.../cassandra/query/CQLClauseTupleMapper.java | 36 -----
.../cassandra/query/CQLStatementBuilder.java | 4 +-
.../query/CQLStatementTupleMapper.java | 19 ---
.../cassandra/query/CQLTableTupleMapper.java | 39 -----
.../cassandra/query/CQLValuesTupleMapper.java | 74 ---------
.../apache/storm/cassandra/query/Column.java | 74 +++++++++
.../apache/storm/cassandra/query/CqlMapper.java | 86 +++++++++++
.../query/SimpleCQLStatementTupleMapper.java | 51 -------
.../builder/BoundCQLStatementMapperBuilder.java | 103 +++++++++++++
.../SimpleCQLStatementMapperBuilder.java | 91 +++++++++++
.../impl/BatchCQLStatementTupleMapper.java | 58 +++++++
.../impl/BoundCQLStatementTupleMapper.java | 106 +++++++++++++
.../query/impl/BoundStatementMapperBuilder.java | 109 -------------
.../query/impl/InsertStatementBuilder.java | 153 -------------------
.../query/impl/PreparedStatementBinder.java | 136 +++++++++++++++++
.../query/impl/RoutingKeyGenerator.java | 52 +++++++
.../query/impl/SimpleCQLStatementMapper.java | 88 +++++++++++
.../query/impl/UpdateStatementBuilder.java | 118 --------------
.../cassandra/query/selector/FieldSelector.java | 6 +-
.../cassandra/DynamicStatementBuilderTest.java | 108 +++++++------
.../bolt/BatchCassandraWriterBoltTest.java | 10 +-
.../cassandra/bolt/CassandraWriterBoltTest.java | 10 +-
26 files changed, 991 insertions(+), 808 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index 943fe5e..1edf708 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -39,17 +39,24 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
```java
new CassandraWriterBolt(
- insertInto("album")
- .values(
- with(fields("title", "year", "performer", "genre", "tracks")
- ).build());
+ async(
+ simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with(
+ fields("title", "year", "performer", "genre", "tracks")
+ )
+ )
+ );
```
+
##### Insert query including all tuple fields.
```java
new CassandraWriterBolt(
- insertInto("album")
- .values(all()).build());
+ async(
+ simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with( all() )
+ )
+ );
```
##### Insert multiple queries from one input tuple.
@@ -57,32 +64,49 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
new CassandraWriterBolt(
async(
- insertInto("titles_per_album").values(all()),
- insertInto("titles_per_performer").values(all())
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
```
-##### Insert query including some fields with aliases
+##### Insert query using QueryBuilder
```java
new CassandraWriterBolt(
- insertInto("album")
- .values(
- with(field("ti"),as("title",
- field("ye").as("year")),
- field("pe").as("performer")),
- field("ge").as("genre")),
- field("tr").as("tracks")),
- ).build());
+ async(
+ simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .with(all()))
+ )
+ )
```
##### Insert query with static bound query
```java
new CassandraWriterBolt(
- boundQuery("INSERT INTO album (\"title\", \"year\", \"performer\", \"genre\", \"tracks\") VALUES(?, ?, ?, ?, ?);")
- .bind(all());
+ async(
+ boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);")
+ .bind(all());
+ )
+ );
+```
+
+##### Insert query with static bound query using named setters and aliases
+```java
+
+ new CassandraWriterBolt(
+ async(
+ boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);")
+ .bind(
+ field("ti"),as("title"),
+ field("ye").as("year")),
+ field("pe").as("performer")),
+ field("ge").as("genre")),
+ field("tr").as("tracks"))
+ ).byNamedSetters()
+ )
+ );
```
##### Insert query with bound statement load from storm configuration
@@ -106,14 +130,14 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*
// Logged
new CassandraWriterBolt(loggedBatch(
- insertInto("title_per_album").values(all())
- insertInto("title_per_performer").values(all())
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
// UnLogged
new CassandraWriterBolt(unLoggedBatch(
- insertInto("title_per_album").values(all())
- insertInto("title_per_performer").values(all())
+ simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())),
+ simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all()))
)
);
```
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
index deea8da..ce53ea3 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -19,10 +19,11 @@
package org.apache.storm.cassandra;
import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
import org.apache.storm.cassandra.query.*;
-import org.apache.storm.cassandra.query.impl.BoundStatementMapperBuilder;
-import org.apache.storm.cassandra.query.impl.InsertStatementBuilder;
-import org.apache.storm.cassandra.query.impl.UpdateStatementBuilder;
+import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder;
+import org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder;
import org.apache.storm.cassandra.query.selector.FieldSelector;
import java.io.Serializable;
@@ -33,73 +34,32 @@ public class DynamicStatementBuilder implements Serializable {
private DynamicStatementBuilder() {
}
- /**
- * Builds a new insert statement for the specified table.
- *
- * @param table the table's name.
- * @return a new {@link InsertStatementBuilder} instance.
- */
- public static final InsertStatementBuilder insertInto(String table) {
- return new InsertStatementBuilder(table);
- }
- /**
- * Builds a new insert statement based on the specified CQL mapper.
- *
- * @param mapper the CQL mapper.
- * @return a new {@link InsertStatementBuilder} instance.
- */
- public static final InsertStatementBuilder insertInto(CQLTableTupleMapper mapper) {
- return new InsertStatementBuilder(mapper);
- }
- /**
- * Builds a new insert statement for the specified keyspace and table.
- *
- * @param ks the keyspace to use.
- * @param table the table's name.
- * @return a new {@link InsertStatementBuilder} instance.
- */
- public static final InsertStatementBuilder insertInto(String ks, String table) {
- return new InsertStatementBuilder(table, ks);
+ public static final SimpleCQLStatementMapperBuilder simpleQuery(String queryString) {
+ return new SimpleCQLStatementMapperBuilder(queryString);
}
- /**
- * Builds a new update statement for the specified table.
- *
- * @param table the table's name.
- * @return a new {@link UpdateStatementBuilder} instance.
- */
- public static final UpdateStatementBuilder update(String table) {
- return new UpdateStatementBuilder(table);
- }
-
- /**
- * Builds a new update statement for the specified keyspace and table.
- *
- * @param table the table's name.
- * @return a new {@link UpdateStatementBuilder} instance.
- */
- public static final UpdateStatementBuilder update(String ks, String table) {
- return new UpdateStatementBuilder(table, ks);
+ public static final SimpleCQLStatementMapperBuilder simpleQuery(BuiltStatement builtStatement) {
+ return new SimpleCQLStatementMapperBuilder(builtStatement);
}
/**
* Builds a new bound statement based on the specified query.
*
* @param cql the query.
- * @return a new {@link BoundStatementMapperBuilder} instance.
+ * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
*/
- public static final BoundStatementMapperBuilder boundQuery(String cql) {
- return new BoundStatementMapperBuilder(cql);
+ public static final BoundCQLStatementMapperBuilder boundQuery(String cql) {
+ return new BoundCQLStatementMapperBuilder(cql);
}
/**
* Builds a new bound statement identified by the given field.
*
* @param field a context used to resolve the cassandra query.
- * @return a new {@link BoundStatementMapperBuilder} instance.
+ * @return a new {@link org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder} instance.
*/
- public static final BoundStatementMapperBuilder boundQuery(ContextQuery field) {
- return new BoundStatementMapperBuilder(field);
+ public static final BoundCQLStatementMapperBuilder boundQuery(ContextQuery field) {
+ return new BoundCQLStatementMapperBuilder(field);
}
/**
@@ -115,27 +75,27 @@ public class DynamicStatementBuilder implements Serializable {
/**
* Creates a new {@link com.datastax.driver.core.BatchStatement.Type#LOGGED} batch statement for the specified CQL statement builders.
*/
- public static final BatchStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
+ public static final BatchCQLStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) {
return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders);
}
/**
* Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders.
*/
- public static final BatchStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
+ public static final BatchCQLStatementTupleMapper counterBatch(CQLStatementBuilder... builders) {
return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders);
}
/**
* Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders.
*/
- public static final BatchStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
+ public static final BatchCQLStatementTupleMapper unLoggedBatch(CQLStatementBuilder... builders) {
return newBatchStatementBuilder(BatchStatement.Type.UNLOGGED, builders);
}
- private static BatchStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
+ private static BatchCQLStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) {
List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length);
for(CQLStatementBuilder b : Arrays.asList(builders))
mappers.add(b.build());
- return new BatchStatementTupleMapper(type, mappers);
+ return new BatchCQLStatementTupleMapper(type, mappers);
}
/**
@@ -181,19 +141,11 @@ public class DynamicStatementBuilder implements Serializable {
return fl.toArray(new FieldSelector[size]);
}
- /**
- * Includes only the specified tuple fields.
- *
- * @param fields a list of field selector.
- */
- public static final CQLValuesTupleMapper with(final FieldSelector... fields) {
- return new CQLValuesTupleMapper.WithFieldTupleMapper(Arrays.asList(fields));
- }
/**
* Includes all tuple fields.
*/
- public static final CQLValuesTupleMapper all() {
- return new CQLValuesTupleMapper.AllTupleMapper();
+ public static final CqlMapper.DefaultCqlMapper all() {
+ return new CqlMapper.DefaultCqlMapper();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
new file mode 100644
index 0000000..c9ba6fa
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
+ *
+ */
+public abstract class BaseCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Statement> map(Map conf, Session session, ITuple tuple) {
+ return Arrays.asList(map(tuple));
+ }
+
+ /**
+ * Maps a given tuple to a single CQL statements.
+ *
+ * @param tuple the incoming tuple to map.
+ * @return a list of {@link com.datastax.driver.core.Statement}.
+ */
+ public abstract Statement map(ITuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
deleted file mode 100644
index 32ff1de..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BatchStatementTupleMapper.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-
-public class BatchStatementTupleMapper implements CQLStatementTupleMapper {
-
- private final List<CQLStatementTupleMapper> mappers;
- private final BatchStatement.Type type;
-
- /**
- * Creates a new {@link BatchStatementTupleMapper} instance.
- * @param type
- * @param mappers
- */
- public BatchStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
- this.mappers = new ArrayList<>(mappers);
- this.type = type;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Statement> map(Map conf, Session session, ITuple tuple) {
- final BatchStatement batch = new BatchStatement(this.type);
- for(CQLStatementTupleMapper m : mappers)
- batch.addAll(m.map(conf, session, tuple));
- return Arrays.asList((Statement)batch);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
deleted file mode 100644
index baa4685..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLClauseTupleMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.querybuilder.Clause;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to
- * a list of cassandra {@link com.datastax.driver.core.querybuilder.Clause}s.
- *
- */
-public interface CQLClauseTupleMapper extends Serializable {
-
- List<Clause> map(ITuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
index 6c04d1c..9ddb58a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java
@@ -21,11 +21,11 @@ package org.apache.storm.cassandra.query;
import java.io.Serializable;
-public interface CQLStatementBuilder extends Serializable {
+public interface CQLStatementBuilder<T extends CQLStatementTupleMapper> extends Serializable {
/**
* Builds a new {@link CQLStatementTupleMapper} instance.
* @return a new CQLStatementMapper instance.
*/
- <T extends CQLStatementTupleMapper> T build();
+ T build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
index 32c86dd..b100d9a 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java
@@ -38,10 +38,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
*/
public interface CQLStatementTupleMapper extends Serializable {
- public static final String FIELD_KEYSPACE = "keyspace";
- public static final String FIELD_TABLE = "table";
- public static final String FIELD_VALUES = "value";
-
/**
* Maps a given tuple to one or multiple CQL statements.
*
@@ -52,21 +48,6 @@ public interface CQLStatementTupleMapper extends Serializable {
*/
List<Statement> map(Map conf, Session session, ITuple tuple);
- public static class InsertCQLStatementTupleMapper implements CQLStatementTupleMapper {
- @Override
- public List<Statement> map(Map conf, Session session, ITuple tuple) {
- Optional<String> ks = Optional.fromNullable(tuple.contains(FIELD_KEYSPACE) ? tuple.getStringByField(FIELD_KEYSPACE) : null);
- String table = tuple.getStringByField(FIELD_TABLE);
- Map<String, Object> values = (Map<String, Object>) tuple.getValueByField(FIELD_VALUES);
-
- final Insert stmt = (ks.isPresent()) ? insertInto(ks.get(), table) : insertInto(table);
- for(Map.Entry<String, Object> v : values.entrySet())
- stmt.value(v.getKey(), v.getValue());
-
- return Arrays.asList((Statement)stmt);
- }
- }
-
public static class DynamicCQLStatementTupleMapper implements CQLStatementTupleMapper {
private List<CQLStatementBuilder> builders;
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
deleted file mode 100644
index 574eac9..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLTableTupleMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-
-import java.io.Serializable;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to a table name.
- *
- */
-public interface CQLTableTupleMapper extends Serializable {
-
- /**
- * Returns a table's name from the specified tuple.
- *
- * @param tuple the incoming tuple.
- * @return the table name.
- */
- String map(ITuple tuple);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
deleted file mode 100644
index 2cb8e4c..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLValuesTupleMapper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import org.apache.storm.cassandra.query.selector.FieldSelector;
-
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface for mapping a {@link backtype.storm.tuple.ITuple} to Map of values of a CQL statement.
- *
- */
-public interface CQLValuesTupleMapper extends Serializable {
-
- /**
- * Map the specified {@code tuple} to values of CQL statement(s).
- * @param tuple the incoming tuple to map.
- * @return values of CQL statement(s)
- */
- Map<String, Object> map(ITuple tuple);
-
- /**
- * Default {@link CQLValuesTupleMapper} implementation to get specifics tuple's fields.
- */
- public static class WithFieldTupleMapper implements CQLValuesTupleMapper {
- private List<FieldSelector> fields;
-
- public WithFieldTupleMapper(List<FieldSelector> fields) {
- this.fields = fields;
- }
-
- @Override
- public Map<String, Object> map(ITuple tuple) {
- Map<String, Object> ret = new LinkedHashMap<>();
- for(FieldSelector fs : fields)
- fs.selectAndPut(tuple, ret);
- return ret;
- }
- }
-
- /**
- * Default {@link CQLValuesTupleMapper} implementation to get all tuple's fields.
- */
- public static class AllTupleMapper implements CQLValuesTupleMapper {
- @Override
- public Map<String, Object> map(ITuple tuple) {
- Map<String, Object> ret = new LinkedHashMap<>();
- for(String name : tuple.getFields())
- ret.put(name, tuple.getValueByField(name));
- return ret;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
new file mode 100644
index 0000000..06643a5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class Column<T> implements Serializable {
+
+ private final String columnName;
+ private final T val;
+
+ public Column(String columnName, T val) {
+ this.columnName = columnName;
+ this.val = val;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public T getVal() {
+ return val;
+ }
+
+ public boolean isNull() { return val == null;}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Column column = (Column) o;
+
+ if (columnName != null ? !columnName.equals(column.columnName) : column.columnName != null) return false;
+ if (val != null ? !val.equals(column.val) : column.val != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = columnName != null ? columnName.hashCode() : 0;
+ result = 31 * result + (val != null ? val.hashCode() : 0);
+ return result;
+ }
+
+ public static Object[] getVals(List<Column> columns) {
+ List<Object> vals = new ArrayList<>(columns.size());
+ for(Column c : columns)
+ vals.add(c.getVal());
+ return vals.toArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
new file mode 100644
index 0000000..2ab8f92
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Default interface to defines how a storm tuple maps to a list of columns representing a row in a database.
+ */
+public interface CqlMapper extends Serializable {
+
+ /**
+ * Maps the specified input tuple to a list of CQL columns.
+ * @param tuple the input tuple.
+ * @return the list of
+ */
+ List<Column> map(ITuple tuple);
+
+
+ public static final class SelectableCqlMapper implements CqlMapper {
+
+ private final List<FieldSelector> selectors;
+
+ /**
+ * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+ * @param selectors list of selectors used to extract column values from tuple.
+ */
+ public SelectableCqlMapper(List<FieldSelector> selectors) {
+ this.selectors = selectors;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Column> map(ITuple tuple) {
+ List<Column> columns = new ArrayList<>(selectors.size());
+ for(FieldSelector selector : selectors)
+ columns.add(selector.select(tuple));
+ return columns;
+ }
+ }
+
+ /**
+ * Default {@link CqlMapper} to map all tuple values to column.
+ */
+ public static final class DefaultCqlMapper implements CqlMapper {
+
+ /**
+ * Creates a new {@link org.apache.storm.cassandra.query.CqlMapper.DefaultCqlMapper} instance.
+ */
+ public DefaultCqlMapper() {}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Column> map(ITuple tuple) {
+ List<Column> columns = new ArrayList<>(tuple.size());
+ for(String name : tuple.getFields())
+ columns.add(new Column(name, tuple.getValueByField(name)));
+ return columns;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
deleted file mode 100644
index 683824e..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/SimpleCQLStatementTupleMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Default interface to map a {@link backtype.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}.
- *
- */
-public abstract class SimpleCQLStatementTupleMapper implements CQLStatementTupleMapper, Serializable {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Statement> map(Map conf, Session session, ITuple tuple) {
- return Arrays.asList(map(tuple));
- }
-
- /**
- * Maps a given tuple to a single CQL statements.
- *
- * @param tuple the incoming tuple to map.
- * @return a list of {@link com.datastax.driver.core.Statement}.
- */
- public abstract Statement map(ITuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..52502b0
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.BoundCQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.impl.PreparedStatementBinder;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.*;
+
+import static org.apache.storm.cassandra.query.ContextQuery.*;
+
+public class BoundCQLStatementMapperBuilder implements CQLStatementBuilder<BoundCQLStatementTupleMapper>, Serializable {
+
+ private final ContextQuery contextQuery;
+
+ private CqlMapper mapper;
+
+ private List<String> routingKeys;
+
+ private PreparedStatementBinder binder;
+
+ /**
+ * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+ * @param cql
+ */
+ public BoundCQLStatementMapperBuilder(String cql) {
+ this.contextQuery = new StaticContextQuery(cql);
+ }
+
+ /**
+ * Creates a new {@link BoundCQLStatementMapperBuilder} instance.
+ * @param contextQuery
+ */
+ public BoundCQLStatementMapperBuilder(ContextQuery contextQuery) {
+ this.contextQuery = contextQuery;
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param fields a list of field selector.
+ */
+ public final BoundCQLStatementMapperBuilder bind(final FieldSelector... fields) {
+ this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+ return this;
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param mapper a column mapper.
+ */
+ public final BoundCQLStatementMapperBuilder bind(CqlMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public final BoundCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+ this.routingKeys = Arrays.asList(fields);
+ return this;
+ }
+
+ public final BoundCQLStatementMapperBuilder byNamedSetters() {
+ this.binder = new PreparedStatementBinder.CQL3NamedSettersBinder();
+ return this;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BoundCQLStatementTupleMapper build() {
+ return new BoundCQLStatementTupleMapper(contextQuery, mapper, getRoutingKeyGenerator(), getStatementBinder());
+ }
+
+ private RoutingKeyGenerator getRoutingKeyGenerator() {
+ return (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null;
+ }
+
+ private PreparedStatementBinder getStatementBinder() {
+ return (binder != null) ? binder : new PreparedStatementBinder.DefaultBinder();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
new file mode 100644
index 0000000..401f1bf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.builder;
+
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import org.apache.storm.cassandra.query.*;
+import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator;
+import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
+import org.apache.storm.cassandra.query.selector.FieldSelector;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Default class to build {@link org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper} instance.
+ */
+public class SimpleCQLStatementMapperBuilder implements CQLStatementBuilder<SimpleCQLStatementMapper>, Serializable {
+
+ private final String queryString;
+
+ private CqlMapper mapper;
+
+ private List<String> routingKeys;
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+ * @param queryString a valid CQL query string.
+ */
+ public SimpleCQLStatementMapperBuilder(String queryString) {
+ this.queryString = queryString;
+ }
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapperBuilder} instance.
+ * @param builtStatement a query built statements
+ */
+ public SimpleCQLStatementMapperBuilder(BuiltStatement builtStatement) {
+ this.queryString = builtStatement.getQueryString();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SimpleCQLStatementMapper build() {
+ return new SimpleCQLStatementMapper(queryString, mapper, (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null );
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param fields a list of field selector.
+ */
+ public final SimpleCQLStatementMapperBuilder with(final FieldSelector... fields) {
+ this.mapper = new CqlMapper.SelectableCqlMapper(Arrays.asList(fields));
+ return this;
+ }
+
+ public final SimpleCQLStatementMapperBuilder withRoutingKeys(String...fields) {
+ this.routingKeys = Arrays.asList(fields);
+ return this;
+ }
+
+ /**
+ * Includes only the specified tuple fields.
+ *
+ * @param mapper a column mapper.
+ */
+ public final SimpleCQLStatementMapperBuilder with(CqlMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
new file mode 100644
index 0000000..fe948f5
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+public class BatchCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+ private final List<CQLStatementTupleMapper> mappers;
+ private final BatchStatement.Type type;
+
+ /**
+ * Creates a new {@link BatchCQLStatementTupleMapper} instance.
+ * @param type
+ * @param mappers
+ */
+ public BatchCQLStatementTupleMapper(BatchStatement.Type type, List<CQLStatementTupleMapper> mappers) {
+ this.mappers = new ArrayList<>(mappers);
+ this.type = type;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Statement> map(Map conf, Session session, ITuple tuple) {
+ final BatchStatement batch = new BatchStatement(this.type);
+ for(CQLStatementTupleMapper m : mappers)
+ batch.addAll(m.map(conf, session, tuple));
+ return Arrays.asList((Statement)batch);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
new file mode 100644
index 0000000..8cce418
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+import org.apache.storm.cassandra.query.ContextQuery;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper {
+
+ private final ContextQuery contextQuery;
+
+ private final CqlMapper mapper;
+
+ private Map<String, PreparedStatement> cache = new HashMap<>();
+
+ private final RoutingKeyGenerator rkGenerator;
+
+ private final PreparedStatementBinder binder;
+
+ /**
+ * Creates a new {@link BoundCQLStatementTupleMapper} instance.
+ *
+ * @param contextQuery
+ * @param mapper
+ * @param mapper
+ * @param rkGenerator
+ * @param binder
+ */
+ public BoundCQLStatementTupleMapper(ContextQuery contextQuery, CqlMapper mapper, RoutingKeyGenerator rkGenerator, PreparedStatementBinder binder) {
+ Preconditions.checkNotNull(contextQuery, "ContextQuery must not be null");
+ Preconditions.checkNotNull(mapper, "Mapper should not be null");
+ this.contextQuery = contextQuery;
+ this.mapper = mapper;
+ this.rkGenerator = rkGenerator;
+ this.binder = binder;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<Statement> map(Map config, Session session, ITuple tuple) {
+ final List<Column> columns = mapper.map(tuple);
+
+ final String query = contextQuery.resolves(config, tuple);
+
+ PreparedStatement statement = getPreparedStatement(session, query);
+ if(hasRoutingKeys()) {
+ List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+ if( keys.size() == 1)
+ statement.setRoutingKey(keys.get(0));
+ else
+ statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+ }
+
+ return Arrays.asList((Statement) this.binder.apply(statement, columns));
+ }
+
+ private boolean hasRoutingKeys() {
+ return rkGenerator != null;
+ }
+
+ /**
+ * Get or prepare a statement using the specified session and the query.
+ * *
+ * @param session The cassandra session.
+ * @param query The CQL query to prepare.
+ */
+ private PreparedStatement getPreparedStatement(Session session, String query) {
+ PreparedStatement statement = cache.get(query);
+ if( statement == null) {
+ statement = session.prepare(query);
+ cache.put(query, statement);
+ }
+ return statement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
deleted file mode 100644
index b7ded14..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.ContextQuery;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery;
-
-
-public class BoundStatementMapperBuilder implements Serializable {
- private final ContextQuery contextQuery;
-
- /**
- * Creates a new {@link BoundStatementMapperBuilder} instance.
- * @param cql
- */
- public BoundStatementMapperBuilder(String cql) {
- this.contextQuery = new StaticContextQuery(cql);
- }
-
- /**
- * Creates a new {@link BoundStatementMapperBuilder} instance.
- * @param contextQuery
- */
- public BoundStatementMapperBuilder(ContextQuery contextQuery) {
- this.contextQuery = contextQuery;
- }
-
- public CQLStatementTupleMapper bind(final CQLValuesTupleMapper mapper) {
- return new CQLBoundStatementTupleMapper(contextQuery, mapper);
- }
-
- public static class CQLBoundStatementTupleMapper implements CQLStatementTupleMapper {
-
- private final ContextQuery contextQuery;
-
- private final CQLValuesTupleMapper mapper;
-
- // should be a lru-map or weakhashmap else this may lead to memory leaks.
- private Map<String, PreparedStatement> cache = new HashMap<>();
-
- /**
- * Creates a new {@link CQLBoundStatementTupleMapper} instance.
- *
- * @param contextQuery
- * @param mapper
- */
- CQLBoundStatementTupleMapper(ContextQuery contextQuery, CQLValuesTupleMapper mapper) {
- this.contextQuery = contextQuery;
- this.mapper = mapper;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<Statement> map(Map config, Session session, ITuple tuple) {
- final Map<String, Object> values = mapper.map(tuple);
- final String query = contextQuery.resolves(config, tuple);
- Object[] objects = values.values().toArray(new Object[values.size()]);
- PreparedStatement statement = getPreparedStatement(session, query);
- // todo bind objects in the same sequence as the statement expects.
- return Arrays.asList((Statement) statement.bind(objects));
- }
-
- /**
- * Get or prepare a statement using the specified session and the query.
- * *
- * @param session The cassandra session.
- * @param query The CQL query to prepare.
- */
- private PreparedStatement getPreparedStatement(Session session, String query) {
- PreparedStatement statement = cache.get(query);
- if (statement == null) {
- statement = session.prepare(query);
- cache.put(query, statement);
- }
- return statement;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
deleted file mode 100644
index 8eddb04..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/InsertStatementBuilder.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.base.Preconditions;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLTableTupleMapper;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class InsertStatementBuilder<T extends CQLValuesTupleMapper> implements CQLStatementBuilder {
- /**
- * Optional name of the table into insert.
- */
- protected final String table;
- /**
- * Optional keyspace name to insert.
- */
- protected final String keyspace;
- /**
- * Optional mapper to retrieve a table name from tuple.
- */
- protected final CQLTableTupleMapper mapper2TableName;
-
- protected T valuesMapper;
-
- protected boolean ifNotExist;
-
- protected Integer ttlInSeconds;
-
- protected ConsistencyLevel level;
-
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param table name of the table into insert.
- */
- public InsertStatementBuilder(String table) {
- this(null, table, null);
- }
-
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param table name of the table into insert.
- * @param keyspace the keyspace's name to which the table belongs
- */
- public InsertStatementBuilder(String table, String keyspace) {
- this(keyspace, table, null);
- }
-
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param mapper2TableName the mapper that specify the table in which insert
- * @param keyspace the name of the keyspace to which the table belongs
- */
- public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName, String keyspace) {
- this(keyspace, null, mapper2TableName);
- }
- /**
- * Creates a new {@link InsertStatementBuilder} instance.
- *
- * @param mapper2TableName the mapper that specify the table in which insert
- */
- public InsertStatementBuilder(CQLTableTupleMapper mapper2TableName) {
- this(null, null, mapper2TableName);
- }
-
- private InsertStatementBuilder(String keyspace, String table, CQLTableTupleMapper mapper2TableName) {
- this.keyspace = keyspace;
- this.table = table;
- this.mapper2TableName = mapper2TableName;
- }
-
- /**
- * Adds "IF NOT EXISTS" clause to the statement.
- * @see com.datastax.driver.core.querybuilder.Insert#ifNotExists()
- */
- public InsertStatementBuilder ifNotExists() {
- this.ifNotExist = true;
- return this;
- }
-
- public InsertStatementBuilder usingTTL(Long time, TimeUnit unit) {
- this.ttlInSeconds = (int) unit.toSeconds(time);
- return this;
- }
-
- /**
- * Sets the consistency level used for this statement.
- *
- * @param level a ConsistencyLevel.
- */
- public InsertStatementBuilder withConsistencyLevel(ConsistencyLevel level) {
- this.level = level;
- return this;
- }
-
- /**
- * Maps tuple to values.
- *
- * @see com.datastax.driver.core.querybuilder.Insert#value(String, Object)
- */
- public InsertStatementBuilder values(final T valuesMapper) {
- this.valuesMapper = valuesMapper;
- return this;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public SimpleCQLStatementTupleMapper build() {
- Preconditions.checkState(null != table, "Table name must not be null");
- return new SimpleCQLStatementTupleMapper() {
- @Override
- public Statement map(ITuple tuple) {
- Insert stmt = QueryBuilder.insertInto(keyspace, table);
- for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
- stmt.value(entry.getKey(), entry.getValue());
- if (ttlInSeconds != null) stmt.using(QueryBuilder.ttl(ttlInSeconds));
- if (ifNotExist) stmt.ifNotExists();
- if (level != null) stmt.setConsistencyLevel(level);
- return stmt;
- }
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
new file mode 100644
index 0000000..7a6c78d
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import com.datastax.driver.core.*;
+import org.apache.storm.cassandra.query.Column;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ *
+ */
+public interface PreparedStatementBinder extends Serializable {
+
+ public BoundStatement apply(PreparedStatement statement, List<Column> columns);
+
+ public static final class DefaultBinder implements PreparedStatementBinder {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+ Object[] values = Column.getVals(columns);
+ return statement.bind(values);
+ }
+ }
+
+ public static final class CQL3NamedSettersBinder implements PreparedStatementBinder {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BoundStatement apply(PreparedStatement statement, List<Column> columns) {
+ Object[] values = Column.getVals(columns);
+
+ BoundStatement boundStatement = statement.bind();
+ for(Column col : columns) {
+ // For native protocol V3 or below, all variables must be bound.
+ // With native protocol V4 or above, variables can be left unset,
+ // in which case they will be ignored server side (no tombstones will be generated).
+ if(col.isNull()) {
+ boundStatement.setToNull(col.getColumnName());
+ } else {
+ bind(boundStatement, col.getColumnName(), col.getVal());
+ }
+ }
+ return statement.bind(values);
+ }
+
+ /**
+ * This ugly method comes from {@link com.datastax.driver.core.TypeCodec#getDataTypeFor(Object)}.
+ */
+ static void bind(BoundStatement statement, String name, Object value) {
+ // Starts with ByteBuffer, so that if already serialized value are provided, we don't have the
+ // cost of testing a bunch of other types first
+ if (value instanceof ByteBuffer)
+ statement.setBytes(name, (ByteBuffer)value);
+
+ if (value instanceof Number) {
+ if (value instanceof Integer)
+ statement.setInt(name, (Integer)value);
+ if (value instanceof Long)
+ statement.setLong(name, (Long) value);
+ if (value instanceof Float)
+ statement.setFloat(name, (Float) value);
+ if (value instanceof Double)
+ statement.setDouble(name, (Double)value);
+ if (value instanceof BigDecimal)
+ statement.setDecimal(name, (BigDecimal)value);
+ if (value instanceof BigInteger)
+ statement.setVarint(name, (BigInteger)value);
+ throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+ }
+
+ if (value instanceof String)
+ statement.setString(name, (String)value);
+
+ if (value instanceof Boolean)
+ statement.setBool(name, (Boolean)value);
+
+ if (value instanceof InetAddress)
+ statement.setInet(name, (InetAddress)value);
+
+ if (value instanceof Date)
+ statement.setDate(name, (Date)value);
+
+ if (value instanceof UUID)
+ statement.setUUID(name, (UUID)value);
+
+ if (value instanceof List) {
+ statement.setList(name, (List)value);
+ }
+
+ if (value instanceof Set) {
+ statement.setSet(name, (Set)value);
+ }
+
+ if (value instanceof Map) {
+ statement.setMap(name, (Map) value);
+ }
+
+ if (value instanceof UDTValue) {
+ statement.setUDTValue(name, (UDTValue)value);
+ }
+
+ if (value instanceof TupleValue) {
+ statement.setTupleValue(name, (TupleValue) value);
+ }
+
+ throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
new file mode 100644
index 0000000..3f4f47e
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RoutingKeyGenerator implements Serializable {
+
+ private List<String> routingKeys;
+
+ /**
+ * Creates a new {@link RoutingKeyGenerator} instance.
+ * @param routingKeys
+ */
+ public RoutingKeyGenerator(List<String> routingKeys) {
+ Preconditions.checkNotNull(routingKeys);
+ this.routingKeys = routingKeys;
+ }
+
+ public List<ByteBuffer> getRoutingKeys(ITuple tuple) {
+ List<ByteBuffer> keys = new ArrayList<>(routingKeys.size());
+ for(String s : routingKeys) {
+ Object value = tuple.getValueByField(s);
+ keys.add(DataType.serializeValue(value, ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ return keys;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
new file mode 100644
index 0000000..ab9adbf
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/SimpleCQLStatementMapper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query.impl;
+
+import backtype.storm.tuple.ITuple;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.query.Column;
+import org.apache.storm.cassandra.query.CqlMapper;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class SimpleCQLStatementMapper implements CQLStatementTupleMapper {
+
+ private final String queryString;
+ private final CqlMapper mapper;
+ private final RoutingKeyGenerator rkGenerator;
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapper} instance.
+ * @param queryString the cql query string to execute.
+ * @param mapper the mapper.
+ */
+ public SimpleCQLStatementMapper(String queryString, CqlMapper mapper) {
+ this(queryString, mapper, null);
+ }
+
+ /**
+ * Creates a new {@link SimpleCQLStatementMapper} instance.
+ * @param queryString the cql query string to execute.
+ * @param mapper the mapper.
+ */
+ public SimpleCQLStatementMapper(String queryString, CqlMapper mapper, RoutingKeyGenerator rkGenerator) {
+ Preconditions.checkNotNull(queryString, "Query string must not be null");
+ Preconditions.checkNotNull(mapper, "Mapper should not be null");
+ this.queryString = queryString;
+ this.mapper = mapper;
+ this.rkGenerator = rkGenerator;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public List<Statement> map(Map conf, Session session, ITuple tuple) {
+ List<Column> columns = mapper.map(tuple);
+ SimpleStatement statement = new SimpleStatement(queryString, Column.getVals(columns));
+
+ if(hasRoutingKeys()) {
+ List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple);
+ if( keys.size() == 1)
+ statement.setRoutingKey(keys.get(0));
+ else
+ statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()]));
+ }
+
+ return Arrays.asList((Statement) statement);
+ }
+
+ private boolean hasRoutingKeys() {
+ return rkGenerator != null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
deleted file mode 100644
index 70696ab..0000000
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/UpdateStatementBuilder.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.query.impl;
-
-import backtype.storm.tuple.ITuple;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Clause;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Update;
-import org.apache.storm.cassandra.query.CQLClauseTupleMapper;
-import org.apache.storm.cassandra.query.CQLStatementBuilder;
-import org.apache.storm.cassandra.query.CQLValuesTupleMapper;
-import org.apache.storm.cassandra.query.SimpleCQLStatementTupleMapper;
-
-import java.util.Map;
-
-import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
-
-public final class UpdateStatementBuilder implements CQLStatementBuilder {
-
- /**
- * The name of table to update.
- */
- private final String table;
- /**
- * The keyspace of the table.
- */
- private final String keyspace;
-
- private CQLValuesTupleMapper valuesMapper;
-
- private CQLClauseTupleMapper clausesMapper;
-
- private CQLClauseTupleMapper onlyIfClausesMapper;
- /**
- * Creates a new {@link UpdateStatementBuilder} instance.
- * @param table the name of table to update.
- */
- public UpdateStatementBuilder(String table) {
- this(table, null);
- }
-
- /**
- * Creates a new {@link UpdateStatementBuilder} instance.
- * @param table the name of table to update.
- * @param keyspace the keyspace of the table.
- */
- public UpdateStatementBuilder(String table, String keyspace) {
- this.table = table;
- this.keyspace = keyspace;
- }
-
- /**
- * Maps output tuple to values.
- * @see com.datastax.driver.core.querybuilder.Update#with(com.datastax.driver.core.querybuilder.Assignment)
- */
- public UpdateStatementBuilder with(final CQLValuesTupleMapper values) {
- this.valuesMapper = values;
- return this;
- }
-
- /**
- * Maps output tuple to some Where clauses.
- * @see com.datastax.driver.core.querybuilder.Update#where(com.datastax.driver.core.querybuilder.Clause)
- */
- public UpdateStatementBuilder where(final CQLClauseTupleMapper clauses) {
- this.clausesMapper = clauses;
- return this;
- }
-
- /**
- * Maps output tuple to some If clauses.
- * @see com.datastax.driver.core.querybuilder.Update#onlyIf(com.datastax.driver.core.querybuilder.Clause)
- */
- public UpdateStatementBuilder onlyIf(final CQLClauseTupleMapper clauses) {
- this.onlyIfClausesMapper = clauses;
- return this;
- }
-
- @Override
- public SimpleCQLStatementTupleMapper build() {
- return new SimpleCQLStatementTupleMapper() {
- @Override
- public Statement map(ITuple tuple) {
- Update stmt = QueryBuilder.update(keyspace, table);
- for(Map.Entry<String, Object> entry : valuesMapper.map(tuple).entrySet())
- stmt.with(set(entry.getKey(), entry.getValue()));
- for(Clause clause : clausesMapper.map(tuple))
- stmt.where(clause);
- if( hasIfClauses())
- for(Clause clause : onlyIfClausesMapper.map(tuple)) {
- stmt.onlyIf(clause);
- }
- return stmt;
- }
- };
- }
-
- private boolean hasIfClauses() {
- return onlyIfClausesMapper != null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/c2997cf9/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
index b68138a..8ce651e 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/selector/FieldSelector.java
@@ -20,6 +20,7 @@ package org.apache.storm.cassandra.query.selector;
import backtype.storm.tuple.ITuple;
import com.datastax.driver.core.utils.UUIDs;
+import org.apache.storm.cassandra.query.Column;
import java.io.Serializable;
import java.util.Map;
@@ -41,9 +42,8 @@ public class FieldSelector implements Serializable {
this.field = field;
}
- public void selectAndPut(ITuple t, Map<String, Object> values) {
- values.put(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
-
+ public Column select(ITuple t) {
+ return new Column<>(as != null ? as : field, isNow ? UUIDs.timeBased() : t.getValueByField(field));
}
/**