You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 22:09:14 UTC
storm git commit: Removing storm-cassandra tests because of lgpl test
dependency.
Repository: storm
Updated Branches:
refs/heads/master 8bfceac22 -> 3db968092
Removing storm-cassandra tests because of lgpl test dependency.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3db96809
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3db96809
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3db96809
Branch: refs/heads/master
Commit: 3db968092077363bd766db516b9e1135273672bf
Parents: 8bfceac
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 11 15:08:33 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 11 15:08:33 2016 -0600
----------------------------------------------------------------------
external/storm-cassandra/pom.xml | 7 -
.../cassandra/DynamicStatementBuilderTest.java | 163 -------------------
.../storm/cassandra/bolt/BaseTopologyTest.java | 60 -------
.../bolt/BatchCassandraWriterBoltTest.java | 66 --------
.../cassandra/bolt/CassandraWriterBoltTest.java | 67 --------
.../cassandra/trident/TridentTopologyTest.java | 125 --------------
6 files changed, 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 3612462..24b3e2a 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -122,12 +122,5 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.cassandraunit</groupId>
- <artifactId>cassandra-unit</artifactId>
- <version>2.1.3.1</version>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
deleted file mode 100644
index f96dae8..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra;
-
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.collect.Maps;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
-import org.cassandraunit.CassandraCQLUnit;
-import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.async;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.fields;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.loggedBatch;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.unLoggedBatch;
-import static org.mockito.Mockito.when;
-
-public class DynamicStatementBuilderTest {
-
- private static final Date NOW = new Date();
-
- private static final Tuple mockTuple;
-
- public static final String COL_WEATHER_STATION_ID = "weather_station_id";
- public static final String COL_EVENT_TIME = "event_time";
- public static final String COL_TEMPERATURE = "temperature";
-
- @Rule
- public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql", "weather"));
-
- static {
- mockTuple = Mockito.mock(Tuple.class);
- when(mockTuple.getValueByField(COL_WEATHER_STATION_ID)).thenReturn("1");
- when(mockTuple.getValueByField(COL_EVENT_TIME)).thenReturn(NOW);
- when(mockTuple.getValueByField(COL_TEMPERATURE)).thenReturn("0°C");
- when(mockTuple.getFields()).thenReturn(new Fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE));
- }
-
- public static final String QUERY_STRING = "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?,?,?);";
-
- @Test
- public void shouldBuildMultipleStaticInsertStatementGivenKeyspaceAndAllMapper() {
- CQLStatementTupleMapper mapper = async(
- simpleQuery(QUERY_STRING).with(all()),
- simpleQuery(QUERY_STRING).with(all())
- );
- List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Assert.assertEquals(2, stmts.size());
- makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
- makeAssertSimpleStatement(QUERY_STRING, stmts.get(1));
- }
-
- @Test
- public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
- SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
- List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Assert.assertEquals(1, stmts.size());
- makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
- }
-
- @Test
- public void shouldBuildStaticInsertStatementUsingBuilderGivenKeyspaceAndAllMapper() {
- Insert insert = QueryBuilder.insertInto("weather", "temperature")
- .value(COL_WEATHER_STATION_ID, "?")
- .value(COL_EVENT_TIME, "?")
- .value(COL_TEMPERATURE, "?");
- SimpleCQLStatementMapper mapper = simpleQuery(insert).with(all()).build();
- List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Assert.assertEquals(1, stmts.size());
- makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
- }
-
- @Test
- public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
- SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
- List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Assert.assertEquals(1, stmts.size());
- makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
- }
-
- @Test
- public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
- SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)).build();
- List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Assert.assertEquals(1, stmts.size());
- makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
- }
-
- @Test
- public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
- BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
- List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Assert.assertEquals(1, stmts.size());
- makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
- }
-
- @Test
- public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
- BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
- List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Assert.assertEquals(1, stmts.size());
- makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
- }
-
- @Test
- public void shouldBuildStaticBoundStatement() {
- CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field(COL_WEATHER_STATION_ID), field(COL_EVENT_TIME).now(), field(COL_TEMPERATURE)));
- List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
- Statement statement = map.get(0);
- Assert.assertNotNull(statement);
- }
-
- public void makeAssertSimpleStatement(String expected, Statement actual) {
- Assert.assertTrue(actual instanceof SimpleStatement);
- Assert.assertEquals(expected, actual.toString());
- Assert.assertEquals(3, ((SimpleStatement)actual).getValues(ProtocolVersion.V3).length);
- }
-
- public void makeAssertBatchStatement(String expected, Statement actual) {
- Assert.assertTrue(actual instanceof BatchStatement);
- Collection<Statement> statements = ((BatchStatement) actual).getStatements();
- for(Statement s : statements) {
- Assert.assertEquals(expected, s.toString());
- Assert.assertEquals(3, ((SimpleStatement)s).getValues(ProtocolVersion.V3).length);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
deleted file mode 100644
index a6d47c9..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.bolt;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-import org.cassandraunit.CassandraCQLUnit;
-import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
-import org.apache.storm.LocalCluster;
-
-import org.junit.Rule;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public abstract class BaseTopologyTest {
-
- @Rule
- public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql","weather"));
-
- protected void runLocalTopologyAndWait(TopologyBuilder builder) {
- LocalCluster cluster = new LocalCluster();
- StormTopology topology = builder.createTopology();
- Config config = getConfig();
- cluster.submitTopology("my-cassandra-topology", config, topology);
-
- Utils.sleep(TimeUnit.SECONDS.toMillis(15));
-
- cluster.killTopology("my-cassandra-topology");
- cluster.shutdown();
- }
-
- protected Config getConfig() {
- Config config = new Config();
- config.put("cassandra.keyspace", "weather");
- config.put("cassandra.nodes", "localhost");
- config.put("cassandra.port", "9142");
- return config;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
deleted file mode 100644
index d0471f4..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.bolt;
-
-import org.apache.storm.topology.TopologyBuilder;
-import com.datastax.driver.core.ResultSet;
-import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
-
-
-/**
- *
- */
-public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
-
- public static final String SPOUT_MOCK = "spout-mock";
- public static final String BOLT_WRITER = "writer";
-
- @Test
- @Ignore("The sleep method should be used in tests")
- public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
- executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
- }
-
- private SimpleCQLStatementMapper getInsertInto() {
- return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
- .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
- }
-
- protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1)
- .setMaxTaskParallelism(1);
-
- builder.setBolt(BOLT_WRITER, bolt, 4)
- .shuffleGrouping(SPOUT_MOCK);
-
- runLocalTopologyAndWait(builder);
-
- ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
- Assert.assertEquals(maxQueries, rows.all().size());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
deleted file mode 100644
index 4e922e3..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.bolt;
-
-import org.apache.storm.topology.TopologyBuilder;
-import com.datastax.driver.core.ResultSet;
-import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
-
-/**
- *
- */
-public class CassandraWriterBoltTest extends BaseTopologyTest {
-
- public static final String SPOUT_MOCK = "spout-mock";
- public static final String BOLT_WRITER = "writer";
-
- @Test
- @Ignore("The sleep method should be used in tests")
- public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
- executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
- }
-
- private SimpleCQLStatementMapper getInsertInto() {
- return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
- .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
- }
-
- protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1)
- .setMaxTaskParallelism(1);
-
- builder.setBolt(BOLT_WRITER, bolt, 4)
- .shuffleGrouping(SPOUT_MOCK);
-
- runLocalTopologyAndWait(builder);
-
- ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
- Assert.assertEquals(maxQueries, rows.all().size());
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
deleted file mode 100644
index 764a574..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.trident;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import org.apache.storm.cassandra.CassandraContext;
-import org.apache.storm.cassandra.bolt.BaseTopologyTest;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.trident.state.CassandraQuery;
-import org.apache.storm.cassandra.trident.state.CassandraState;
-import org.apache.storm.cassandra.trident.state.CassandraStateFactory;
-import org.apache.storm.cassandra.trident.state.CassandraStateUpdater;
-import org.apache.storm.cassandra.trident.state.TridentResultSetValuesMapper;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.Random;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-
-/**
- *
- */
-public class TridentTopologyTest extends BaseTopologyTest {
-
- @Test
- public void testTridentTopology() throws Exception {
-
- Session session = cassandraCQLUnit.session;
- String[] stationIds = {"station-1", "station-2", "station-3"};
- for (int i = 1; i < 4; i++) {
- ResultSet resultSet = session.execute("INSERT INTO weather.station(id, name) VALUES(?, ?)", stationIds[i-1],
- "Foo-Station-" + new Random().nextInt());
- }
-
- ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.station");
- for (Row row : rows) {
- System.out.println("####### row = " + row);
- }
-
- WeatherBatchSpout weatherBatchSpout =
- new WeatherBatchSpout(new Fields("weather_station_id", "temperature", "event_time"), 3,
- stationIds);
-
- TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("cassandra-trident-stream", weatherBatchSpout);
-
- CassandraStateFactory insertValuesStateFactory = getInsertTemperatureStateFactory();
-
- CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
-
- TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
- stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
- stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
-
- stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
-
- StormTopology stormTopology = topology.build();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", getConfig(), stormTopology);
- Thread.sleep(30 * 1000);
-
- rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature");
- Assert.assertTrue(rows.iterator().hasNext()); // basic sanity check
-
- cluster.killTopology("wordCounter");
- cluster.shutdown();
- }
-
- public static class PrintFunction extends BaseFunction {
-
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- System.out.println("####### tuple = " + tuple.getValues());
- collector.emit(tuple.getValues());
- }
- }
-
- private CassandraStateFactory getInsertTemperatureStateFactory() {
- CassandraState.Options options = new CassandraState.Options(new CassandraContext());
- CQLStatementTupleMapper insertTemperatureValues = boundQuery(
- "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
- .bind(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))
- .build();
- options.withCQLStatementTupleMapper(insertTemperatureValues);
- return new CassandraStateFactory(options);
- }
-
- public CassandraStateFactory getSelectWeatherStationStateFactory() {
- CassandraState.Options options = new CassandraState.Options(new CassandraContext());
- CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
- .bind(field("weather_station_id").as("id")).build();
- options.withCQLStatementTupleMapper(insertTemperatureValues);
- options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
- return new CassandraStateFactory(options);
- }
-}