You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/11 07:41:20 UTC
[1/6] storm git commit: STORM-1211 Addressed review comments
Repository: storm
Updated Branches:
refs/heads/master 84820265b -> 6d7b96885
STORM-1211 Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c56eb7e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c56eb7e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c56eb7e
Branch: refs/heads/master
Commit: 4c56eb7e47a723ec870c5783d68d8320381e2dff
Parents: 5cc697b
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Thu Dec 3 21:09:00 2015 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Dec 8 11:40:23 2015 +0530
----------------------------------------------------------------------
.../cassandra/trident/state/CassandraState.java | 35 ++++++++++++--------
.../trident/state/CassandraStateFactory.java | 2 +-
2 files changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4c56eb7e/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
index 05618ee..b807a60 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
@@ -18,7 +18,6 @@
*/
package org.apache.storm.cassandra.trident.state;
-import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
import backtype.storm.tuple.Values;
import com.datastax.driver.core.BatchStatement;
@@ -53,7 +52,7 @@ public class CassandraState implements State {
private Session session;
private SimpleClient client;
- public CassandraState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions, Options options) {
+ protected CassandraState(Map conf, Options options) {
this.conf = conf;
this.options = options;
}
@@ -62,8 +61,7 @@ public class CassandraState implements State {
private final SimpleClientProvider clientProvider;
private CQLStatementTupleMapper cqlStatementTupleMapper;
private CQLResultSetValuesMapper cqlResultSetValuesMapper;
- private BatchStatement.Type batchingType = BatchStatement.Type.LOGGED;
-
+ private BatchStatement.Type batchingType;
public Options(SimpleClientProvider clientProvider) {
this.clientProvider = clientProvider;
@@ -88,36 +86,48 @@ public class CassandraState implements State {
@Override
public void beginCommit(Long txid) {
- LOG.debug("beginCommit is noop");
+ LOG.debug("beginCommit is no operation");
}
@Override
public void commit(Long txid) {
- LOG.debug("commit is noop");
+ LOG.debug("commit is no operation");
}
public void prepare() {
+ Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
+
client = options.clientProvider.getClient(conf);
session = client.connect();
}
public void cleanup() {
- session.close();
- client.close();
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.warn("Error occurred while closing Session", e);
+ } finally {
+ client.close();
+ }
}
public void updateState(List<TridentTuple> tuples, final TridentCollector collector) {
- Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
List<Statement> statements = new ArrayList<>();
for (TridentTuple tuple : tuples) {
statements.addAll(options.cqlStatementTupleMapper.map(conf, session, tuple));
}
- BatchStatement batchStatement = new BatchStatement(options.batchingType);
- batchStatement.addAll(statements);
try {
- session.execute(batchStatement);
+ if (options.batchingType != null) {
+ BatchStatement batchStatement = new BatchStatement(options.batchingType);
+ batchStatement.addAll(statements);
+ session.execute(batchStatement);
+ } else {
+ for (Statement statement : statements) {
+ session.execute(statement);
+ }
+ }
} catch (Exception e) {
LOG.warn("Batch write operation is failed.");
collector.reportError(e);
@@ -127,7 +137,6 @@ public class CassandraState implements State {
}
public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
- Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
Preconditions.checkNotNull(options.cqlResultSetValuesMapper, "CassandraState.Options should have cqlResultSetValuesMapper");
List<List<Values>> batchRetrieveResult = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/storm/blob/4c56eb7e/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
index 285fa38..ceaa11d 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
@@ -43,7 +43,7 @@ public class CassandraStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- CassandraState cassandraState = new CassandraState(conf, metrics, partitionIndex, numPartitions, options);
+ CassandraState cassandraState = new CassandraState(conf, options);
cassandraState.prepare();
return cassandraState;
[6/6] storm git commit: Added STORM-1211 to Changelog.
Posted by sr...@apache.org.
Added STORM-1211 to Changelog.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6d7b9688
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6d7b9688
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6d7b9688
Branch: refs/heads/master
Commit: 6d7b96885a5ef3d8e957bba5ce2810f35127a471
Parents: 0be444e
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Dec 10 21:29:07 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Dec 10 21:29:07 2015 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6d7b9688/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8cace6e..3b1a9f9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1211: Add trident state and query support for cassandra connector
* STORM-1359: Change kryo links from google code to github
* STORM-1385: Divide by zero exception in stats.clj
* STORM-1370: Bug fixes for MultitenantScheduler
[2/6] storm git commit: STORM-1211 Added trident support for
Cassandra connector.
Posted by sr...@apache.org.
STORM-1211 Added trident support for Cassandra connector.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef9479d0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef9479d0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef9479d0
Branch: refs/heads/master
Commit: ef9479d06add8f70517d410d7e12a6c21940bd75
Parents: 10b33a6
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Thu Dec 3 09:43:05 2015 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Dec 8 11:40:23 2015 +0530
----------------------------------------------------------------------
.../cassandra/client/impl/DefaultClient.java | 22 +--
.../query/CQLResultSetValuesMapper.java | 36 +++++
.../query/impl/BoundStatementMapperBuilder.java | 8 +-
.../cassandra/trident/state/CassandraQuery.java | 44 ++++++
.../cassandra/trident/state/CassandraState.java | 149 +++++++++++++++++++
.../trident/state/CassandraStateFactory.java | 51 +++++++
.../trident/state/CassandraStateUpdater.java | 36 +++++
.../state/TridentResultSetValuesMapper.java | 63 ++++++++
.../cassandra/DynamicStatementBuilderTest.java | 46 +++---
.../apache/storm/cassandra/WeatherSpout.java | 4 +-
.../bolt/BatchCassandraWriterBoltTest.java | 13 +-
.../cassandra/bolt/CassandraWriterBoltTest.java | 13 +-
.../cassandra/trident/TridentTopologyTest.java | 125 ++++++++++++++++
.../cassandra/trident/WeatherBatchSpout.java | 104 +++++++++++++
.../src/test/resources/schema.cql | 12 +-
15 files changed, 679 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
index 945d0a8..6eb8e16 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
@@ -36,25 +36,25 @@ import java.util.Set;
* Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance.
*/
public class DefaultClient implements SimpleClient, Closeable, Serializable {
-
+
private final static Logger LOG = LoggerFactory.getLogger(DefaultClient.class);
-
+
private String keyspace;
private Cluster cluster;
-
+
private Session session;
/**
* Create a new {@link DefaultClient} instance.
- *
+ *
* @param cluster a cassandra cluster client.
*/
public DefaultClient(Cluster cluster, String keyspace) {
Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
this.cluster = cluster;
this.keyspace = keyspace;
-
+
}
public Set<Host> getAllHosts() {
@@ -71,14 +71,15 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
Thread thread = Thread.currentThread();
return thread.getName();
}
+
/**
* {@inheritDoc}
*/
@Override
public synchronized Session connect() throws NoHostAvailableException {
- if( isDisconnected() ) {
+ if (isDisconnected()) {
LOG.info("Connected to cluster: {}", cluster.getClusterName());
- for ( Host host : getAllHosts())
+ for (Host host : getAllHosts())
LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack());
LOG.info("Connect to cluster using keyspace %s", keyspace);
@@ -87,7 +88,7 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
LOG.warn("{} - Already connected to cluster: {}", getExecutorName(), cluster.getClusterName());
}
- if( session.isClosed() ) {
+ if (session.isClosed()) {
LOG.warn("Session has been closed - create new one!");
this.session = cluster.newSession();
}
@@ -105,13 +106,14 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
* {@inheritDoc}
*/
@Override
- public void close( ) {
- if( cluster != null && !cluster.isClosed() ) {
+ public void close() {
+ if (cluster != null && !cluster.isClosed()) {
LOG.info("Try to close connection to cluster: {}", cluster.getClusterName());
session.close();
cluster.close();
}
}
+
/**
* {@inheritDoc}
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
new file mode 100644
index 0000000..80b1173
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public interface CQLResultSetValuesMapper extends Serializable {
+
+ List<List<Values>> map(Session session, Statement statement, ITuple tuple);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
index 4348253..b7ded14 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.storm.cassandra.query.ContextQuery.*;
+import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery;
public class BoundStatementMapperBuilder implements Serializable {
@@ -64,6 +64,7 @@ public class BoundStatementMapperBuilder implements Serializable {
private final CQLValuesTupleMapper mapper;
+ // should be a lru-map or weakhashmap else this may lead to memory leaks.
private Map<String, PreparedStatement> cache = new HashMap<>();
/**
@@ -86,7 +87,8 @@ public class BoundStatementMapperBuilder implements Serializable {
final String query = contextQuery.resolves(config, tuple);
Object[] objects = values.values().toArray(new Object[values.size()]);
PreparedStatement statement = getPreparedStatement(session, query);
- return Arrays.asList((Statement)statement.bind(objects));
+ // todo bind objects in the same sequence as the statement expects.
+ return Arrays.asList((Statement) statement.bind(objects));
}
/**
@@ -97,7 +99,7 @@ public class BoundStatementMapperBuilder implements Serializable {
*/
private PreparedStatement getPreparedStatement(Session session, String query) {
PreparedStatement statement = cache.get(query);
- if( statement == null) {
+ if (statement == null) {
statement = session.prepare(query);
cache.put(query, statement);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
new file mode 100644
index 0000000..085cbca
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.tuple.Values;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class CassandraQuery extends BaseQueryFunction<CassandraState, List<Values>> {
+
+ @Override
+ public List<List<Values>> batchRetrieve(CassandraState state, List<TridentTuple> tridentTuples) {
+ return state.batchRetrieve(tridentTuples);
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, List<Values> valuesList, TridentCollector collector) {
+ for (Values values : valuesList) {
+ collector.emit(values);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
new file mode 100644
index 0000000..05618ee
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraState implements State {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraState.class);
+
+ private final Map conf;
+ private final Options options;
+
+ private Session session;
+ private SimpleClient client;
+
+ public CassandraState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions, Options options) {
+ this.conf = conf;
+ this.options = options;
+ }
+
+ public static final class Options implements Serializable {
+ private final SimpleClientProvider clientProvider;
+ private CQLStatementTupleMapper cqlStatementTupleMapper;
+ private CQLResultSetValuesMapper cqlResultSetValuesMapper;
+ private BatchStatement.Type batchingType = BatchStatement.Type.LOGGED;
+
+
+ public Options(SimpleClientProvider clientProvider) {
+ this.clientProvider = clientProvider;
+ }
+
+ public Options withCQLStatementTupleMapper(CQLStatementTupleMapper cqlStatementTupleMapper) {
+ this.cqlStatementTupleMapper = cqlStatementTupleMapper;
+ return this;
+ }
+
+ public Options withCQLResultSetValuesMapper(CQLResultSetValuesMapper cqlResultSetValuesMapper) {
+ this.cqlResultSetValuesMapper = cqlResultSetValuesMapper;
+ return this;
+ }
+
+ public Options withBatching(BatchStatement.Type batchingType) {
+ this.batchingType = batchingType;
+ return this;
+ }
+
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is noop");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is noop");
+ }
+
+ public void prepare() {
+ client = options.clientProvider.getClient(conf);
+ session = client.connect();
+ }
+
+ public void cleanup() {
+ session.close();
+ client.close();
+ }
+
+ public void updateState(List<TridentTuple> tuples, final TridentCollector collector) {
+ Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
+
+ List<Statement> statements = new ArrayList<>();
+ for (TridentTuple tuple : tuples) {
+ statements.addAll(options.cqlStatementTupleMapper.map(conf, session, tuple));
+ }
+ BatchStatement batchStatement = new BatchStatement(options.batchingType);
+ batchStatement.addAll(statements);
+
+ try {
+ session.execute(batchStatement);
+ } catch (Exception e) {
+ LOG.warn("Batch write operation is failed.");
+ collector.reportError(e);
+ throw new FailedException(e);
+ }
+
+ }
+
+ public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+ Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
+ Preconditions.checkNotNull(options.cqlResultSetValuesMapper, "CassandraState.Options should have cqlResultSetValuesMapper");
+
+ List<List<Values>> batchRetrieveResult = new ArrayList<>();
+ try {
+ for (TridentTuple tridentTuple : tridentTuples) {
+ List<Statement> statements = options.cqlStatementTupleMapper.map(conf, session, tridentTuple);
+ for (Statement statement : statements) {
+ List<List<Values>> values = options.cqlResultSetValuesMapper.map(session, statement, tridentTuple);
+ batchRetrieveResult.addAll(values);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch retrieve operation is failed.");
+ throw new FailedException(e);
+ }
+ return batchRetrieveResult;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
new file mode 100644
index 0000000..285fa38
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraStateFactory implements StateFactory {
+ private final CassandraState.Options options;
+
+ public CassandraStateFactory(CassandraState.Options options) {
+ this.options = options;
+ }
+
+ public CassandraStateFactory(CQLStatementTupleMapper cqlStatementTupleMapper, CQLResultSetValuesMapper cqlResultSetValuesMapper) {
+ this(new CassandraState.Options(new CassandraContext()).withCQLStatementTupleMapper(cqlStatementTupleMapper).withCQLResultSetValuesMapper(cqlResultSetValuesMapper));
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ CassandraState cassandraState = new CassandraState(conf, metrics, partitionIndex, numPartitions, options);
+ cassandraState.prepare();
+
+ return cassandraState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
new file mode 100644
index 0000000..6453198
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class CassandraStateUpdater extends BaseStateUpdater<CassandraState> {
+
+ @Override
+ public void updateState(CassandraState state, List<TridentTuple> list, TridentCollector collector) {
+ state.updateState(list, collector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
new file mode 100644
index 0000000..be7bda1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *
+ */
+public class TridentResultSetValuesMapper implements CQLResultSetValuesMapper {
+ private Fields outputDeclaredFields;
+
+ public TridentResultSetValuesMapper(Fields outputDeclaredFields) {
+ this.outputDeclaredFields = outputDeclaredFields;
+ }
+
+ @Override
+ public List<List<Values>> map(Session session, Statement statement, ITuple tuple) {
+ List<List<Values>> list = new ArrayList<>();
+ ResultSet resultSet = session.execute(statement);
+ for (Row row : resultSet) {
+ final Values values = new Values();
+ for (String field : outputDeclaredFields) {
+ if (tuple.contains(field)) {
+ values.add(tuple.getValueByField(field));
+ } else {
+ values.add(row.getObject(field));
+ }
+ }
+ list.add(new LinkedList<Values>() {{
+ add(values);
+ }});
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
index 22e12da..843bf6b 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -36,7 +36,15 @@ import java.util.Collection;
import java.util.Date;
import java.util.List;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.async;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.fields;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.loggedBatch;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.unLoggedBatch;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
import static org.mockito.Mockito.when;
public class DynamicStatementBuilderTest {
@@ -46,14 +54,14 @@ public class DynamicStatementBuilderTest {
private static final Tuple mockTuple;
@Rule
- public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql","weather"));
+ public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql", "weather"));
static {
mockTuple = Mockito.mock(Tuple.class);
- when(mockTuple.getValueByField("weatherstation_id")).thenReturn("1");
+ when(mockTuple.getValueByField("weather_station_id")).thenReturn("1");
when(mockTuple.getValueByField("event_time")).thenReturn(NOW);
when(mockTuple.getValueByField("temperature")).thenReturn("0°C");
- when(mockTuple.getFields()).thenReturn(new Fields("weatherstation_id", "event_time", "temperature"));
+ when(mockTuple.getFields()).thenReturn(new Fields("weather_station_id", "event_time", "temperature"));
}
@Test
@@ -63,51 +71,51 @@ public class DynamicStatementBuilderTest {
insertInto("weather", "temperature").values(all()),
insertInto("weather", "temperature").values(all())
),
- "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
- "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
+ "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
+ "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
);
}
@Test
public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
executeStatementAndAssert(insertInto("weather", "temperature").values(all()).build(),
- "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
executeStatementAndAssert(insertInto("temperature").values(all()).build(),
- "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
- executeStatementAndAssert(insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature"))).build(),
- "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ executeStatementAndAssert(insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature"))).build(),
+ "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
executeBatchStatementAndAssert(loggedBatch(
- insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature")))
- ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
+ ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
executeBatchStatementAndAssert(unLoggedBatch(
- insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature")))
- ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
+ ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
private void executeBatchStatementAndAssert(CQLStatementTupleMapper mapper, String... results) {
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- BatchStatement statement = (BatchStatement)map.get(0);
+ BatchStatement statement = (BatchStatement) map.get(0);
Collection<Statement> statements = statement.getStatements();
Assert.assertEquals(results.length, statements.size());
- for(Statement s : statements)
+ for (Statement s : statements)
Assert.assertTrue(Arrays.asList(results).contains(s.toString()));
}
@@ -116,15 +124,15 @@ public class DynamicStatementBuilderTest {
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
List<String> listExpected = Arrays.asList(expected);
- for( int i=0; i< map.size(); i++) {
+ for (int i = 0; i < map.size(); i++) {
Assert.assertEquals(listExpected.get(i), map.get(i).toString());
}
}
@Test
public void shouldBuildStaticBoundStatement() {
- CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weatherstation_id, event_time, temperature) VALUES(?, ?, ?)")
- .bind(with(field("weatherstation_id"), field("event_time").now(), field("temperature")));
+ CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weather_station_id, event_time, temperature) VALUES(?, ?, ?)")
+ .bind(with(field("weather_station_id"), field("event_time").now(), field("temperature")));
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Statement statement = map.get(0);
Assert.assertNotNull(statement);
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
index 49a1f80..0b7d8a5 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
@@ -52,7 +52,7 @@ public class WeatherSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("weatherstation_id", "temperature"));
+ outputFieldsDeclarer.declare(new Fields("weather_station_id", "temperature"));
}
@Override
@@ -77,7 +77,7 @@ public class WeatherSpout extends BaseRichSpout {
@Override
public void nextTuple() {
- if( emit.get() < maxQueries.get() ) {
+ if (emit.get() < maxQueries.get()) {
spoutOutputCollector.emit(new Values(stationID, "38°C"), emit.incrementAndGet());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 4253189..80eb95a 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -26,7 +26,9 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
/**
@@ -34,16 +36,17 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
*/
public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
- public static final String SPOUT_MOCK = "spout-mock";
+ public static final String SPOUT_MOCK = "spout-mock";
public static final String BOLT_WRITER = "writer";
- @Test @Ignore("The sleep method should be used in tests")
+ @Test
+ @Ignore("The sleep method should be used in tests")
public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
}
private SimpleCQLStatementTupleMapper getInsertInto() {
- return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build();
+ return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
@@ -57,7 +60,7 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
runLocalTopologyAndWait(builder);
- ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'");
+ ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
Assert.assertEquals(maxQueries, rows.all().size());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index 7717d4d..2aef53c 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -26,23 +26,26 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
/**
*
*/
public class CassandraWriterBoltTest extends BaseTopologyTest {
- public static final String SPOUT_MOCK = "spout-mock";
+ public static final String SPOUT_MOCK = "spout-mock";
public static final String BOLT_WRITER = "writer";
- @Test @Ignore("The sleep method should be used in tests")
+ @Test
+ @Ignore("The sleep method should be used in tests")
public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
}
private SimpleCQLStatementTupleMapper getInsertInto() {
- return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build();
+ return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
@@ -56,7 +59,7 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
runLocalTopologyAndWait(builder);
- ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'");
+ ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
Assert.assertEquals(maxQueries, rows.all().size());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
new file mode 100644
index 0000000..c775919
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.bolt.BaseTopologyTest;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.trident.state.CassandraQuery;
+import org.apache.storm.cassandra.trident.state.CassandraState;
+import org.apache.storm.cassandra.trident.state.CassandraStateFactory;
+import org.apache.storm.cassandra.trident.state.CassandraStateUpdater;
+import org.apache.storm.cassandra.trident.state.TridentResultSetValuesMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
+
+/**
+ *
+ */
+public class TridentTopologyTest extends BaseTopologyTest {
+
+ @Test
+ public void testTridentTopology() throws Exception {
+
+ Session session = cassandraCQLUnit.session;
+ String[] stationIds = {"station-1", "station-2", "station-3"};
+ for (int i = 1; i < 4; i++) {
+ ResultSet resultSet = session.execute("INSERT INTO weather.station(id, name) VALUES(?, ?)", stationIds[i-1],
+ "Foo-Station-" + new Random().nextInt());
+ }
+
+ ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.station");
+ for (Row row : rows) {
+ System.out.println("####### row = " + row);
+ }
+
+ WeatherBatchSpout weatherBatchSpout =
+ new WeatherBatchSpout(new Fields("weather_station_id", "temperature", "event_time"), 3,
+ stationIds);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("cassandra-trident-stream", weatherBatchSpout);
+
+ CassandraStateFactory insertValuesStateFactory = getInsertTemperatureStateFactory();
+
+ CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
+
+ TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+ stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+ stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
+
+ stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+
+ StormTopology stormTopology = topology.build();
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", getConfig(), stormTopology);
+ Thread.sleep(30 * 1000);
+
+ rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature");
+ Assert.assertTrue(rows.iterator().hasNext()); // basic sanity check
+
+ cluster.killTopology("wordCounter");
+ cluster.shutdown();
+ }
+
+ public static class PrintFunction extends BaseFunction {
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ System.out.println("####### tuple = " + tuple.getValues());
+ collector.emit(tuple.getValues());
+ }
+ }
+
+ private CassandraStateFactory getInsertTemperatureStateFactory() {
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+ "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+ .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ return new CassandraStateFactory(options);
+ }
+
+ public CassandraStateFactory getSelectWeatherStationStateFactory() {
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+ .bind(with(field("weather_station_id").as("id")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+ return new CassandraStateFactory(options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
new file mode 100644
index 0000000..62f31db
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class WeatherBatchSpout implements IBatchSpout {
+ private final Fields outputFields;
+ private final int batchSize;
+ private final String[] stationIds;
+ private final HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+ List<Object>[] outputs;
+
+ public WeatherBatchSpout(Fields fields, int batchSize, String[] stationIds) {
+ this.outputFields = fields;
+ this.batchSize = batchSize;
+ this.stationIds = stationIds;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> batch = this.batches.get(batchId);
+ if(batch == null){
+ batch = new ArrayList<>();
+ for (int i=0; i< batchSize; i++) {
+ batch.add(createTuple());
+ }
+ this.batches.put(batchId, batch);
+ }
+ for(List<Object> list : batch){
+ collector.emit(list);
+ }
+ }
+
+ private List<Object> createTuple() {
+ final Random random = new Random();
+ List<Object> values = new ArrayList<Object>(){{
+ add(stationIds[random.nextInt(stationIds.length)]);
+ add(random.nextInt(100) + "");
+ add(UUID.randomUUID());}};
+ return values;
+ }
+
+ @Override
+ public void ack(long batchId) {
+ batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return outputFields;
+ }
+
+ public int getRemainingBatches() {
+ return batches.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/resources/schema.cql
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/resources/schema.cql b/external/storm-cassandra/src/test/resources/schema.cql
index 04f8c45..1d131a4 100644
--- a/external/storm-cassandra/src/test/resources/schema.cql
+++ b/external/storm-cassandra/src/test/resources/schema.cql
@@ -16,9 +16,15 @@
*/
CREATE TABLE temperature (
- weatherstation_id TEXT,
+ weather_station_id TEXT,
+ weather_station_name TEXT,
event_time TIMEUUID,
temperature TEXT,
- PRIMARY KEY(weatherstation_id, event_time)
-)
+ PRIMARY KEY(weather_station_id, event_time)
+);
+CREATE TABLE station (
+ id TEXT,
+ name TEXT,
+ PRIMARY KEY(id)
+);
\ No newline at end of file
[3/6] storm git commit: STORM-1211: Added doc in README on how to use
trident state/query APIs
Posted by sr...@apache.org.
STORM-1211: Added doc in README on how to use trident state/query APIs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5cc697b7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5cc697b7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5cc697b7
Branch: refs/heads/master
Commit: 5cc697b745fd5f0cfaf940da74d963654efca228
Parents: ef9479d
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Thu Dec 3 17:44:47 2015 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Dec 8 11:40:23 2015 +0530
----------------------------------------------------------------------
external/storm-cassandra/README.md | 28 ++++++++++++++++++++++++++++
external/storm-cassandra/pom.xml | 9 +++++++++
2 files changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5cc697b7/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index fb77425..943fe5e 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -178,6 +178,34 @@ builder.setBolt("BOLT_WRITER", bolt, 4)
.customGrouping("spout", new Murmur3StreamGrouping("title"))
```
+### Trident API support
+storm-cassandra support Trident `state` API for `inserting` data into Cassandra.
+```java
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+ "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+ .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options);
+ TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+ stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+ stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
+ stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+```
+
+Below `state` API for `querying` data from Cassandra.
+```java
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+ .bind(with(field("weather_station_id").as("id")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+ CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options);
+ CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
+ TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+ stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+```
+
## License
Licensed to the Apache Software Foundation (ASF) under one
http://git-wip-us.apache.org/repos/asf/storm/blob/5cc697b7/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 86cd87d..446b18b 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -51,6 +51,15 @@
<role>developer</role>
</roles>
</developer>
+ <developer>
+ <id>satishd</id>
+ <name>Satish Duggana</name>
+ <email>satish.duggana@gmail.com</email>
+ <url>https://github.com/satishd</url>
+ <roles>
+ <role>developer</role>
+ </roles>
+ </developer>
</developers>
<build>
[4/6] storm git commit: Merge remote-tracking branch
'upstream/master' into STORM-1211
Posted by sr...@apache.org.
Merge remote-tracking branch 'upstream/master' into STORM-1211
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cc009d4e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cc009d4e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cc009d4e
Branch: refs/heads/master
Commit: cc009d4eb44e0d5e644413d542742a7dc273ef45
Parents: 4c56eb7 ceb3a0c
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Thu Dec 10 10:17:58 2015 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Thu Dec 10 10:17:58 2015 +0530
----------------------------------------------------------------------
CHANGELOG.md | 4 +
README.markdown | 5 +-
bin/storm.py | 16 +-
external/sql/README.md | 117 +++++
external/sql/pom.xml | 44 ++
external/sql/storm-sql-core/pom.xml | 261 ++++++++++
.../sql/storm-sql-core/src/codegen/config.fmpp | 23 +
.../storm-sql-core/src/codegen/data/Parser.tdd | 64 +++
.../src/codegen/includes/license.ftl | 17 +
.../src/codegen/includes/parserImpls.ftl | 86 ++++
.../src/jvm/org/apache/storm/sql/StormSql.java | 54 +++
.../jvm/org/apache/storm/sql/StormSqlImpl.java | 187 ++++++++
.../org/apache/storm/sql/StormSqlRunner.java | 44 ++
.../apache/storm/sql/compiler/CompilerUtil.java | 168 +++++++
.../apache/storm/sql/compiler/ExprCompiler.java | 471 +++++++++++++++++++
.../sql/compiler/PostOrderRelNodeVisitor.java | 122 +++++
.../backends/standalone/PlanCompiler.java | 132 ++++++
.../backends/standalone/RelNodeCompiler.java | 111 +++++
.../compiler/backends/trident/PlanCompiler.java | 201 ++++++++
.../backends/trident/RelNodeCompiler.java | 116 +++++
.../storm/sql/javac/CompilingClassLoader.java | 225 +++++++++
.../storm/sql/parser/ColumnConstraint.java | 42 ++
.../storm/sql/parser/ColumnDefinition.java | 44 ++
.../apache/storm/sql/parser/SqlCreateTable.java | 136 ++++++
.../apache/storm/sql/parser/SqlDDLKeywords.java | 27 ++
.../apache/storm/sql/parser/StormParser.java | 42 ++
.../apache/storm/sql/parser/UnparseUtil.java | 60 +++
.../test/org/apache/storm/sql/TestStormSql.java | 82 ++++
.../storm/sql/compiler/TestCompilerUtils.java | 64 +++
.../storm/sql/compiler/TestExprCompiler.java | 93 ++++
.../storm/sql/compiler/TestExprSemantic.java | 140 ++++++
.../backends/standalone/TestPlanCompiler.java | 69 +++
.../standalone/TestRelNodeCompiler.java | 62 +++
.../backends/trident/TestPlanCompiler.java | 116 +++++
.../apache/storm/sql/parser/TestSqlParser.java | 48 ++
external/sql/storm-sql-kafka/pom.xml | 111 +++++
.../org/apache/storm/sql/kafka/JsonScheme.java | 58 +++
.../apache/storm/sql/kafka/JsonSerializer.java | 56 +++
.../sql/kafka/KafkaDataSourcesProvider.java | 205 ++++++++
...apache.storm.sql.runtime.DataSourcesProvider | 16 +
.../storm/sql/kafka/TestJsonRepresentation.java | 50 ++
.../sql/kafka/TestKafkaDataSourcesProvider.java | 114 +++++
external/sql/storm-sql-runtime/pom.xml | 73 +++
.../sql/runtime/AbstractChannelHandler.java | 44 ++
.../sql/runtime/AbstractValuesProcessor.java | 49 ++
.../storm/sql/runtime/ChannelContext.java | 30 ++
.../storm/sql/runtime/ChannelHandler.java | 39 ++
.../org/apache/storm/sql/runtime/Channels.java | 80 ++++
.../apache/storm/sql/runtime/DataSource.java | 29 ++
.../storm/sql/runtime/DataSourcesProvider.java | 49 ++
.../storm/sql/runtime/DataSourcesRegistry.java | 78 +++
.../org/apache/storm/sql/runtime/FieldInfo.java | 45 ++
.../storm/sql/runtime/IOutputSerializer.java | 31 ++
.../sql/runtime/ISqlTridentDataSource.java | 30 ++
.../storm/sql/runtime/StormSqlFunctions.java | 36 ++
.../trident/AbstractTridentProcessor.java | 43 ++
.../test/org/apache/storm/sql/TestUtils.java | 163 +++++++
.../jvm/storm/kafka/ByteBufferSerializer.java | 41 ++
.../src/jvm/storm/kafka/IntSerializer.java | 42 ++
pom.xml | 17 +
.../src/clj/backtype/storm/daemon/drpc.clj | 6 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 6 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 8 +-
.../clj/backtype/storm/daemon/supervisor.clj | 8 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 8 +-
.../org/apache/storm/pacemaker/pacemaker.clj | 8 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 1 +
.../src/jvm/backtype/storm/utils/Utils.java | 15 +-
.../storm/windowing/WindowManagerTest.java | 22 +-
storm-dist/binary/pom.xml | 10 +
storm-dist/binary/src/main/assembly/binary.xml | 25 +-
71 files changed, 5116 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
[5/6] storm git commit: Merge branch 'STORM-1211' of
https://github.com/satishd/storm into STORM-1211
Posted by sr...@apache.org.
Merge branch 'STORM-1211' of https://github.com/satishd/storm into STORM-1211
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0be444ea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0be444ea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0be444ea
Branch: refs/heads/master
Commit: 0be444ea832707f76747534aaa39eb7be214942a
Parents: 8482026 cc009d4
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Dec 10 21:28:14 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Dec 10 21:28:14 2015 -0800
----------------------------------------------------------------------
external/storm-cassandra/README.md | 28 ++++
external/storm-cassandra/pom.xml | 9 ++
.../cassandra/client/impl/DefaultClient.java | 22 +--
.../query/CQLResultSetValuesMapper.java | 36 +++++
.../query/impl/BoundStatementMapperBuilder.java | 8 +-
.../cassandra/trident/state/CassandraQuery.java | 44 ++++++
.../cassandra/trident/state/CassandraState.java | 158 +++++++++++++++++++
.../trident/state/CassandraStateFactory.java | 51 ++++++
.../trident/state/CassandraStateUpdater.java | 36 +++++
.../state/TridentResultSetValuesMapper.java | 63 ++++++++
.../cassandra/DynamicStatementBuilderTest.java | 46 +++---
.../apache/storm/cassandra/WeatherSpout.java | 4 +-
.../bolt/BatchCassandraWriterBoltTest.java | 13 +-
.../cassandra/bolt/CassandraWriterBoltTest.java | 13 +-
.../cassandra/trident/TridentTopologyTest.java | 125 +++++++++++++++
.../cassandra/trident/WeatherBatchSpout.java | 104 ++++++++++++
.../src/test/resources/schema.cql | 12 +-
17 files changed, 725 insertions(+), 47 deletions(-)
----------------------------------------------------------------------