You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/11 07:41:21 UTC
[2/6] storm git commit: STORM-1211 Added trident support for
Cassandra connector.
STORM-1211 Added trident support for Cassandra connector.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef9479d0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef9479d0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef9479d0
Branch: refs/heads/master
Commit: ef9479d06add8f70517d410d7e12a6c21940bd75
Parents: 10b33a6
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Thu Dec 3 09:43:05 2015 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Dec 8 11:40:23 2015 +0530
----------------------------------------------------------------------
.../cassandra/client/impl/DefaultClient.java | 22 +--
.../query/CQLResultSetValuesMapper.java | 36 +++++
.../query/impl/BoundStatementMapperBuilder.java | 8 +-
.../cassandra/trident/state/CassandraQuery.java | 44 ++++++
.../cassandra/trident/state/CassandraState.java | 149 +++++++++++++++++++
.../trident/state/CassandraStateFactory.java | 51 +++++++
.../trident/state/CassandraStateUpdater.java | 36 +++++
.../state/TridentResultSetValuesMapper.java | 63 ++++++++
.../cassandra/DynamicStatementBuilderTest.java | 46 +++---
.../apache/storm/cassandra/WeatherSpout.java | 4 +-
.../bolt/BatchCassandraWriterBoltTest.java | 13 +-
.../cassandra/bolt/CassandraWriterBoltTest.java | 13 +-
.../cassandra/trident/TridentTopologyTest.java | 125 ++++++++++++++++
.../cassandra/trident/WeatherBatchSpout.java | 104 +++++++++++++
.../src/test/resources/schema.cql | 12 +-
15 files changed, 679 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
index 945d0a8..6eb8e16 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
@@ -36,25 +36,25 @@ import java.util.Set;
* Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance.
*/
public class DefaultClient implements SimpleClient, Closeable, Serializable {
-
+
private final static Logger LOG = LoggerFactory.getLogger(DefaultClient.class);
-
+
private String keyspace;
private Cluster cluster;
-
+
private Session session;
/**
* Create a new {@link DefaultClient} instance.
- *
+ *
* @param cluster a cassandra cluster client.
*/
public DefaultClient(Cluster cluster, String keyspace) {
Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
this.cluster = cluster;
this.keyspace = keyspace;
-
+
}
public Set<Host> getAllHosts() {
@@ -71,14 +71,15 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
Thread thread = Thread.currentThread();
return thread.getName();
}
+
/**
* {@inheritDoc}
*/
@Override
public synchronized Session connect() throws NoHostAvailableException {
- if( isDisconnected() ) {
+ if (isDisconnected()) {
LOG.info("Connected to cluster: {}", cluster.getClusterName());
- for ( Host host : getAllHosts())
+ for (Host host : getAllHosts())
LOG.info("Datacenter: {}; Host: {}; Rack: {}", host.getDatacenter(), host.getAddress(), host.getRack());
LOG.info("Connect to cluster using keyspace %s", keyspace);
@@ -87,7 +88,7 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
LOG.warn("{} - Already connected to cluster: {}", getExecutorName(), cluster.getClusterName());
}
- if( session.isClosed() ) {
+ if (session.isClosed()) {
LOG.warn("Session has been closed - create new one!");
this.session = cluster.newSession();
}
@@ -105,13 +106,14 @@ public class DefaultClient implements SimpleClient, Closeable, Serializable {
* {@inheritDoc}
*/
@Override
- public void close( ) {
- if( cluster != null && !cluster.isClosed() ) {
+ public void close() {
+ if (cluster != null && !cluster.isClosed()) {
LOG.info("Try to close connection to cluster: {}", cluster.getClusterName());
session.close();
cluster.close();
}
}
+
/**
* {@inheritDoc}
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
new file mode 100644
index 0000000..80b1173
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.query;
+
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public interface CQLResultSetValuesMapper extends Serializable {
+
+ List<List<Values>> map(Session session, Statement statement, ITuple tuple);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
index 4348253..b7ded14 100644
--- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundStatementMapperBuilder.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.storm.cassandra.query.ContextQuery.*;
+import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery;
public class BoundStatementMapperBuilder implements Serializable {
@@ -64,6 +64,7 @@ public class BoundStatementMapperBuilder implements Serializable {
private final CQLValuesTupleMapper mapper;
+ // should be a lru-map or weakhashmap else this may lead to memory leaks.
private Map<String, PreparedStatement> cache = new HashMap<>();
/**
@@ -86,7 +87,8 @@ public class BoundStatementMapperBuilder implements Serializable {
final String query = contextQuery.resolves(config, tuple);
Object[] objects = values.values().toArray(new Object[values.size()]);
PreparedStatement statement = getPreparedStatement(session, query);
- return Arrays.asList((Statement)statement.bind(objects));
+ // todo bind objects in the same sequence as the statement expects.
+ return Arrays.asList((Statement) statement.bind(objects));
}
/**
@@ -97,7 +99,7 @@ public class BoundStatementMapperBuilder implements Serializable {
*/
private PreparedStatement getPreparedStatement(Session session, String query) {
PreparedStatement statement = cache.get(query);
- if( statement == null) {
+ if (statement == null) {
statement = session.prepare(query);
cache.put(query, statement);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
new file mode 100644
index 0000000..085cbca
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraQuery.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.tuple.Values;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class CassandraQuery extends BaseQueryFunction<CassandraState, List<Values>> {
+
+ @Override
+ public List<List<Values>> batchRetrieve(CassandraState state, List<TridentTuple> tridentTuples) {
+ return state.batchRetrieve(tridentTuples);
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, List<Values> valuesList, TridentCollector collector) {
+ for (Values values : valuesList) {
+ collector.emit(values);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
new file mode 100644
index 0000000..05618ee
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraState.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraState implements State {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraState.class);
+
+ private final Map conf;
+ private final Options options;
+
+ private Session session;
+ private SimpleClient client;
+
+ public CassandraState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions, Options options) {
+ this.conf = conf;
+ this.options = options;
+ }
+
+ public static final class Options implements Serializable {
+ private final SimpleClientProvider clientProvider;
+ private CQLStatementTupleMapper cqlStatementTupleMapper;
+ private CQLResultSetValuesMapper cqlResultSetValuesMapper;
+ private BatchStatement.Type batchingType = BatchStatement.Type.LOGGED;
+
+
+ public Options(SimpleClientProvider clientProvider) {
+ this.clientProvider = clientProvider;
+ }
+
+ public Options withCQLStatementTupleMapper(CQLStatementTupleMapper cqlStatementTupleMapper) {
+ this.cqlStatementTupleMapper = cqlStatementTupleMapper;
+ return this;
+ }
+
+ public Options withCQLResultSetValuesMapper(CQLResultSetValuesMapper cqlResultSetValuesMapper) {
+ this.cqlResultSetValuesMapper = cqlResultSetValuesMapper;
+ return this;
+ }
+
+ public Options withBatching(BatchStatement.Type batchingType) {
+ this.batchingType = batchingType;
+ return this;
+ }
+
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is noop");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is noop");
+ }
+
+ public void prepare() {
+ client = options.clientProvider.getClient(conf);
+ session = client.connect();
+ }
+
+ public void cleanup() {
+ session.close();
+ client.close();
+ }
+
+ public void updateState(List<TridentTuple> tuples, final TridentCollector collector) {
+ Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
+
+ List<Statement> statements = new ArrayList<>();
+ for (TridentTuple tuple : tuples) {
+ statements.addAll(options.cqlStatementTupleMapper.map(conf, session, tuple));
+ }
+ BatchStatement batchStatement = new BatchStatement(options.batchingType);
+ batchStatement.addAll(statements);
+
+ try {
+ session.execute(batchStatement);
+ } catch (Exception e) {
+ LOG.warn("Batch write operation is failed.");
+ collector.reportError(e);
+ throw new FailedException(e);
+ }
+
+ }
+
+ public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+ Preconditions.checkNotNull(options.cqlStatementTupleMapper, "CassandraState.Options should have cqlStatementTupleMapper");
+ Preconditions.checkNotNull(options.cqlResultSetValuesMapper, "CassandraState.Options should have cqlResultSetValuesMapper");
+
+ List<List<Values>> batchRetrieveResult = new ArrayList<>();
+ try {
+ for (TridentTuple tridentTuple : tridentTuples) {
+ List<Statement> statements = options.cqlStatementTupleMapper.map(conf, session, tridentTuple);
+ for (Statement statement : statements) {
+ List<List<Values>> values = options.cqlResultSetValuesMapper.map(session, statement, tridentTuple);
+ batchRetrieveResult.addAll(values);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch retrieve operation is failed.");
+ throw new FailedException(e);
+ }
+ return batchRetrieveResult;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
new file mode 100644
index 0000000..285fa38
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class CassandraStateFactory implements StateFactory {
+ private final CassandraState.Options options;
+
+ public CassandraStateFactory(CassandraState.Options options) {
+ this.options = options;
+ }
+
+ public CassandraStateFactory(CQLStatementTupleMapper cqlStatementTupleMapper, CQLResultSetValuesMapper cqlResultSetValuesMapper) {
+ this(new CassandraState.Options(new CassandraContext()).withCQLStatementTupleMapper(cqlStatementTupleMapper).withCQLResultSetValuesMapper(cqlResultSetValuesMapper));
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ CassandraState cassandraState = new CassandraState(conf, metrics, partitionIndex, numPartitions, options);
+ cassandraState.prepare();
+
+ return cassandraState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
new file mode 100644
index 0000000..6453198
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraStateUpdater.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class CassandraStateUpdater extends BaseStateUpdater<CassandraState> {
+
+ @Override
+ public void updateState(CassandraState state, List<TridentTuple> list, TridentCollector collector) {
+ state.updateState(list, collector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
new file mode 100644
index 0000000..be7bda1
--- /dev/null
+++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/TridentResultSetValuesMapper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident.state;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.query.CQLResultSetValuesMapper;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *
+ */
+public class TridentResultSetValuesMapper implements CQLResultSetValuesMapper {
+ private Fields outputDeclaredFields;
+
+ public TridentResultSetValuesMapper(Fields outputDeclaredFields) {
+ this.outputDeclaredFields = outputDeclaredFields;
+ }
+
+ @Override
+ public List<List<Values>> map(Session session, Statement statement, ITuple tuple) {
+ List<List<Values>> list = new ArrayList<>();
+ ResultSet resultSet = session.execute(statement);
+ for (Row row : resultSet) {
+ final Values values = new Values();
+ for (String field : outputDeclaredFields) {
+ if (tuple.contains(field)) {
+ values.add(tuple.getValueByField(field));
+ } else {
+ values.add(row.getObject(field));
+ }
+ }
+ list.add(new LinkedList<Values>() {{
+ add(values);
+ }});
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
index 22e12da..843bf6b 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
@@ -36,7 +36,15 @@ import java.util.Collection;
import java.util.Date;
import java.util.List;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.async;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.fields;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.loggedBatch;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.unLoggedBatch;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
import static org.mockito.Mockito.when;
public class DynamicStatementBuilderTest {
@@ -46,14 +54,14 @@ public class DynamicStatementBuilderTest {
private static final Tuple mockTuple;
@Rule
- public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql","weather"));
+ public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql", "weather"));
static {
mockTuple = Mockito.mock(Tuple.class);
- when(mockTuple.getValueByField("weatherstation_id")).thenReturn("1");
+ when(mockTuple.getValueByField("weather_station_id")).thenReturn("1");
when(mockTuple.getValueByField("event_time")).thenReturn(NOW);
when(mockTuple.getValueByField("temperature")).thenReturn("0°C");
- when(mockTuple.getFields()).thenReturn(new Fields("weatherstation_id", "event_time", "temperature"));
+ when(mockTuple.getFields()).thenReturn(new Fields("weather_station_id", "event_time", "temperature"));
}
@Test
@@ -63,51 +71,51 @@ public class DynamicStatementBuilderTest {
insertInto("weather", "temperature").values(all()),
insertInto("weather", "temperature").values(all())
),
- "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
- "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
+ "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');",
+ "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');"
);
}
@Test
public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
executeStatementAndAssert(insertInto("weather", "temperature").values(all()).build(),
- "INSERT INTO weather.temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
executeStatementAndAssert(insertInto("temperature").values(all()).build(),
- "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
- executeStatementAndAssert(insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature"))).build(),
- "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ executeStatementAndAssert(insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature"))).build(),
+ "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
executeBatchStatementAndAssert(loggedBatch(
- insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature")))
- ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
+ ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
@Test
public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
executeBatchStatementAndAssert(unLoggedBatch(
- insertInto("temperature").values(with(fields("weatherstation_id", "event_time", "temperature")))
- ), "INSERT INTO temperature(weatherstation_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
+ insertInto("temperature").values(with(fields("weather_station_id", "event_time", "temperature")))
+ ), "INSERT INTO temperature(weather_station_id,event_time,temperature) VALUES ('1'," + NOW.getTime() + ",'0°C');");
}
private void executeBatchStatementAndAssert(CQLStatementTupleMapper mapper, String... results) {
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- BatchStatement statement = (BatchStatement)map.get(0);
+ BatchStatement statement = (BatchStatement) map.get(0);
Collection<Statement> statements = statement.getStatements();
Assert.assertEquals(results.length, statements.size());
- for(Statement s : statements)
+ for (Statement s : statements)
Assert.assertTrue(Arrays.asList(results).contains(s.toString()));
}
@@ -116,15 +124,15 @@ public class DynamicStatementBuilderTest {
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
List<String> listExpected = Arrays.asList(expected);
- for( int i=0; i< map.size(); i++) {
+ for (int i = 0; i < map.size(); i++) {
Assert.assertEquals(listExpected.get(i), map.get(i).toString());
}
}
@Test
public void shouldBuildStaticBoundStatement() {
- CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weatherstation_id, event_time, temperature) VALUES(?, ?, ?)")
- .bind(with(field("weatherstation_id"), field("event_time").now(), field("temperature")));
+ CQLStatementTupleMapper mapper = boundQuery("INSERT INTO weather.temperature(weather_station_id, event_time, temperature) VALUES(?, ?, ?)")
+ .bind(with(field("weather_station_id"), field("event_time").now(), field("temperature")));
List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
Statement statement = map.get(0);
Assert.assertNotNull(statement);
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
index 49a1f80..0b7d8a5 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/WeatherSpout.java
@@ -52,7 +52,7 @@ public class WeatherSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("weatherstation_id", "temperature"));
+ outputFieldsDeclarer.declare(new Fields("weather_station_id", "temperature"));
}
@Override
@@ -77,7 +77,7 @@ public class WeatherSpout extends BaseRichSpout {
@Override
public void nextTuple() {
- if( emit.get() < maxQueries.get() ) {
+ if (emit.get() < maxQueries.get()) {
spoutOutputCollector.emit(new Values(stationID, "38°C"), emit.incrementAndGet());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
index 4253189..80eb95a 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
@@ -26,7 +26,9 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
/**
@@ -34,16 +36,17 @@ import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
*/
public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
- public static final String SPOUT_MOCK = "spout-mock";
+ public static final String SPOUT_MOCK = "spout-mock";
public static final String BOLT_WRITER = "writer";
- @Test @Ignore("The sleep method should be used in tests")
+ @Test
+ @Ignore("The sleep method should be used in tests")
public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
}
private SimpleCQLStatementTupleMapper getInsertInto() {
- return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build();
+ return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
@@ -57,7 +60,7 @@ public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
runLocalTopologyAndWait(builder);
- ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'");
+ ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
Assert.assertEquals(maxQueries, rows.all().size());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
index 7717d4d..2aef53c 100644
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
@@ -26,23 +26,26 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.*;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.insertInto;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
/**
*
*/
public class CassandraWriterBoltTest extends BaseTopologyTest {
- public static final String SPOUT_MOCK = "spout-mock";
+ public static final String SPOUT_MOCK = "spout-mock";
public static final String BOLT_WRITER = "writer";
- @Test @Ignore("The sleep method should be used in tests")
+ @Test
+ @Ignore("The sleep method should be used in tests")
public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
}
private SimpleCQLStatementTupleMapper getInsertInto() {
- return insertInto("weather", "temperature").values(with(field("weatherstation_id"), field("event_time").now(), field("temperature"))).build();
+ return insertInto("weather", "temperature").values(with(field("weather_station_id"), field("event_time").now(), field("temperature"))).build();
}
protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
@@ -56,7 +59,7 @@ public class CassandraWriterBoltTest extends BaseTopologyTest {
runLocalTopologyAndWait(builder);
- ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weatherstation_id='test'");
+ ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
Assert.assertEquals(maxQueries, rows.all().size());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
new file mode 100644
index 0000000..c775919
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.bolt.BaseTopologyTest;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.apache.storm.cassandra.trident.state.CassandraQuery;
+import org.apache.storm.cassandra.trident.state.CassandraState;
+import org.apache.storm.cassandra.trident.state.CassandraStateFactory;
+import org.apache.storm.cassandra.trident.state.CassandraStateUpdater;
+import org.apache.storm.cassandra.trident.state.TridentResultSetValuesMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
+import static org.apache.storm.cassandra.DynamicStatementBuilder.with;
+
+/**
+ *
+ */
+public class TridentTopologyTest extends BaseTopologyTest {
+
+ @Test
+ public void testTridentTopology() throws Exception {
+
+ Session session = cassandraCQLUnit.session;
+ String[] stationIds = {"station-1", "station-2", "station-3"};
+ for (int i = 1; i < 4; i++) {
+ ResultSet resultSet = session.execute("INSERT INTO weather.station(id, name) VALUES(?, ?)", stationIds[i-1],
+ "Foo-Station-" + new Random().nextInt());
+ }
+
+ ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.station");
+ for (Row row : rows) {
+ System.out.println("####### row = " + row);
+ }
+
+ WeatherBatchSpout weatherBatchSpout =
+ new WeatherBatchSpout(new Fields("weather_station_id", "temperature", "event_time"), 3,
+ stationIds);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("cassandra-trident-stream", weatherBatchSpout);
+
+ CassandraStateFactory insertValuesStateFactory = getInsertTemperatureStateFactory();
+
+ CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
+
+ TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+ stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+ stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
+
+ stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+
+ StormTopology stormTopology = topology.build();
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("wordCounter", getConfig(), stormTopology);
+ Thread.sleep(30 * 1000);
+
+ rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature");
+ Assert.assertTrue(rows.iterator().hasNext()); // basic sanity check
+
+ cluster.killTopology("wordCounter");
+ cluster.shutdown();
+ }
+
+ public static class PrintFunction extends BaseFunction {
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ System.out.println("####### tuple = " + tuple.getValues());
+ collector.emit(tuple.getValues());
+ }
+ }
+
+ private CassandraStateFactory getInsertTemperatureStateFactory() {
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+ "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+ .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ return new CassandraStateFactory(options);
+ }
+
+ public CassandraStateFactory getSelectWeatherStationStateFactory() {
+ CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+ CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+ .bind(with(field("weather_station_id").as("id")));
+ options.withCQLStatementTupleMapper(insertTemperatureValues);
+ options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+ return new CassandraStateFactory(options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
new file mode 100644
index 0000000..62f31db
--- /dev/null
+++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.trident;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class WeatherBatchSpout implements IBatchSpout {
+ private final Fields outputFields;
+ private final int batchSize;
+ private final String[] stationIds;
+ private final HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
+ List<Object>[] outputs;
+
+ public WeatherBatchSpout(Fields fields, int batchSize, String[] stationIds) {
+ this.outputFields = fields;
+ this.batchSize = batchSize;
+ this.stationIds = stationIds;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> batch = this.batches.get(batchId);
+ if(batch == null){
+ batch = new ArrayList<>();
+ for (int i=0; i< batchSize; i++) {
+ batch.add(createTuple());
+ }
+ this.batches.put(batchId, batch);
+ }
+ for(List<Object> list : batch){
+ collector.emit(list);
+ }
+ }
+
+ private List<Object> createTuple() {
+ final Random random = new Random();
+ List<Object> values = new ArrayList<Object>(){{
+ add(stationIds[random.nextInt(stationIds.length)]);
+ add(random.nextInt(100) + "");
+ add(UUID.randomUUID());}};
+ return values;
+ }
+
+ @Override
+ public void ack(long batchId) {
+ batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return outputFields;
+ }
+
+ public int getRemainingBatches() {
+ return batches.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ef9479d0/external/storm-cassandra/src/test/resources/schema.cql
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/resources/schema.cql b/external/storm-cassandra/src/test/resources/schema.cql
index 04f8c45..1d131a4 100644
--- a/external/storm-cassandra/src/test/resources/schema.cql
+++ b/external/storm-cassandra/src/test/resources/schema.cql
@@ -16,9 +16,15 @@
*/
CREATE TABLE temperature (
- weatherstation_id TEXT,
+ weather_station_id TEXT,
+ weather_station_name TEXT,
event_time TIMEUUID,
temperature TEXT,
- PRIMARY KEY(weatherstation_id, event_time)
-)
+ PRIMARY KEY(weather_station_id, event_time)
+);
+CREATE TABLE station (
+ id TEXT,
+ name TEXT,
+ PRIMARY KEY(id)
+);
\ No newline at end of file