You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 22:09:14 UTC

storm git commit: Removing storm-cassandra tests because of lgpl test dependency.

Repository: storm
Updated Branches:
  refs/heads/master 8bfceac22 -> 3db968092


Removing storm-cassandra tests because of lgpl test dependency.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3db96809
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3db96809
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3db96809

Branch: refs/heads/master
Commit: 3db968092077363bd766db516b9e1135273672bf
Parents: 8bfceac
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 11 15:08:33 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 11 15:08:33 2016 -0600

----------------------------------------------------------------------
 external/storm-cassandra/pom.xml                |   7 -
 .../cassandra/DynamicStatementBuilderTest.java  | 163 -------------------
 .../storm/cassandra/bolt/BaseTopologyTest.java  |  60 -------
 .../bolt/BatchCassandraWriterBoltTest.java      |  66 --------
 .../cassandra/bolt/CassandraWriterBoltTest.java |  67 --------
 .../cassandra/trident/TridentTopologyTest.java  | 125 --------------
 6 files changed, 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 3612462..24b3e2a 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -122,12 +122,5 @@
             <version>1.10.19</version>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.cassandraunit</groupId>
-            <artifactId>cassandra-unit</artifactId>
-            <version>2.1.3.1</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
deleted file mode 100644
index f96dae8..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/DynamicStatementBuilderTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra;
-
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.querybuilder.Insert;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.collect.Maps;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
-import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
-import org.cassandraunit.CassandraCQLUnit;
-import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.async;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.fields;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.loggedBatch;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.unLoggedBatch;
-import static org.mockito.Mockito.when;
-
-public class DynamicStatementBuilderTest {
-
-    private static final Date NOW = new Date();
-
-    private static final Tuple mockTuple;
-
-    public static final String COL_WEATHER_STATION_ID = "weather_station_id";
-    public static final String COL_EVENT_TIME  = "event_time";
-    public static final String COL_TEMPERATURE = "temperature";
-
-    @Rule
-    public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql", "weather"));
-
-    static {
-        mockTuple = Mockito.mock(Tuple.class);
-        when(mockTuple.getValueByField(COL_WEATHER_STATION_ID)).thenReturn("1");
-        when(mockTuple.getValueByField(COL_EVENT_TIME)).thenReturn(NOW);
-        when(mockTuple.getValueByField(COL_TEMPERATURE)).thenReturn("0°C");
-        when(mockTuple.getFields()).thenReturn(new Fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE));
-    }
-
-    public static final String QUERY_STRING = "INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?,?,?);";
-
-    @Test
-    public void shouldBuildMultipleStaticInsertStatementGivenKeyspaceAndAllMapper() {
-        CQLStatementTupleMapper mapper = async(
-                simpleQuery(QUERY_STRING).with(all()),
-                simpleQuery(QUERY_STRING).with(all())
-        );
-        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Assert.assertEquals(2, stmts.size());
-        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
-        makeAssertSimpleStatement(QUERY_STRING, stmts.get(1));
-    }
-
-    @Test
-    public void shouldBuildStaticInsertStatementGivenKeyspaceAndAllMapper() {
-        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
-        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Assert.assertEquals(1, stmts.size());
-        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
-    }
-
-    @Test
-    public void shouldBuildStaticInsertStatementUsingBuilderGivenKeyspaceAndAllMapper() {
-        Insert insert = QueryBuilder.insertInto("weather", "temperature")
-                .value(COL_WEATHER_STATION_ID, "?")
-                .value(COL_EVENT_TIME, "?")
-                .value(COL_TEMPERATURE, "?");
-        SimpleCQLStatementMapper mapper = simpleQuery(insert).with(all()).build();
-        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Assert.assertEquals(1, stmts.size());
-        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
-    }
-
-    @Test
-    public void shouldBuildStaticInsertStatementGivenNoKeyspaceAllMapper() {
-        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(all()).build();
-        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Assert.assertEquals(1, stmts.size());
-        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
-    }
-
-    @Test
-    public void shouldBuildStaticInsertStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        SimpleCQLStatementMapper mapper = simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)).build();
-        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Assert.assertEquals(1, stmts.size());
-        makeAssertSimpleStatement(QUERY_STRING, stmts.get(0));
-    }
-
-    @Test
-    public void shouldBuildStaticLoggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        BatchCQLStatementTupleMapper mapper = loggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
-        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Assert.assertEquals(1, stmts.size());
-        makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
-    }
-
-    @Test
-    public void shouldBuildStaticUnloggedBatchStatementGivenNoKeyspaceAndWithFieldsMapper() {
-        BatchCQLStatementTupleMapper mapper = unLoggedBatch(simpleQuery(QUERY_STRING).with(fields(COL_WEATHER_STATION_ID, COL_EVENT_TIME, COL_TEMPERATURE)));
-        List<Statement> stmts = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Assert.assertEquals(1, stmts.size());
-        makeAssertBatchStatement(QUERY_STRING, stmts.get(0));
-    }
-
-    @Test
-    public void shouldBuildStaticBoundStatement() {
-        CQLStatementTupleMapper mapper = async(boundQuery(QUERY_STRING).bind(field(COL_WEATHER_STATION_ID), field(COL_EVENT_TIME).now(), field(COL_TEMPERATURE)));
-        List<Statement> map = mapper.map(Maps.newHashMap(), cassandraCQLUnit.session, mockTuple);
-        Statement statement = map.get(0);
-        Assert.assertNotNull(statement);
-    }
-
-    public void makeAssertSimpleStatement(String expected, Statement actual) {
-        Assert.assertTrue(actual instanceof SimpleStatement);
-        Assert.assertEquals(expected, actual.toString());
-        Assert.assertEquals(3, ((SimpleStatement)actual).getValues(ProtocolVersion.V3).length);
-    }
-
-    public void makeAssertBatchStatement(String expected, Statement actual) {
-        Assert.assertTrue(actual instanceof BatchStatement);
-        Collection<Statement> statements = ((BatchStatement) actual).getStatements();
-        for(Statement s : statements) {
-            Assert.assertEquals(expected, s.toString());
-            Assert.assertEquals(3, ((SimpleStatement)s).getValues(ProtocolVersion.V3).length);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
deleted file mode 100644
index a6d47c9..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BaseTopologyTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.bolt;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.Utils;
-import org.cassandraunit.CassandraCQLUnit;
-import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
-import org.apache.storm.LocalCluster;
-
-import org.junit.Rule;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public abstract class BaseTopologyTest {
-
-    @Rule
-    public CassandraCQLUnit cassandraCQLUnit = new CassandraCQLUnit(new ClassPathCQLDataSet("schema.cql","weather"));
-
-    protected void runLocalTopologyAndWait(TopologyBuilder builder) {
-        LocalCluster cluster = new LocalCluster();
-        StormTopology topology = builder.createTopology();
-        Config config = getConfig();
-        cluster.submitTopology("my-cassandra-topology", config, topology);
-
-        Utils.sleep(TimeUnit.SECONDS.toMillis(15));
-
-        cluster.killTopology("my-cassandra-topology");
-        cluster.shutdown();
-    }
-
-    protected Config getConfig() {
-        Config config = new Config();
-        config.put("cassandra.keyspace", "weather");
-        config.put("cassandra.nodes", "localhost");
-        config.put("cassandra.port", "9142");
-        return config;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
deleted file mode 100644
index d0471f4..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBoltTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.bolt;
-
-import org.apache.storm.topology.TopologyBuilder;
-import com.datastax.driver.core.ResultSet;
-import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
-
-
-/**
- *
- */
-public class BatchCassandraWriterBoltTest extends BaseTopologyTest {
-
-    public static final String SPOUT_MOCK = "spout-mock";
-    public static final String BOLT_WRITER = "writer";
-
-    @Test
-    @Ignore("The sleep method should be used in tests")
-    public void shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
-        executeAndAssertWith(100000, new BatchCassandraWriterBolt(getInsertInto()));
-    }
-
-    private SimpleCQLStatementMapper getInsertInto() {
-        return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
-                .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
-    }
-
-    protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1)
-                .setMaxTaskParallelism(1);
-
-        builder.setBolt(BOLT_WRITER, bolt, 4)
-                .shuffleGrouping(SPOUT_MOCK);
-
-        runLocalTopologyAndWait(builder);
-
-        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
-        Assert.assertEquals(maxQueries, rows.all().size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
deleted file mode 100644
index 4e922e3..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/bolt/CassandraWriterBoltTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.bolt;
-
-import org.apache.storm.topology.TopologyBuilder;
-import com.datastax.driver.core.ResultSet;
-import org.apache.storm.cassandra.WeatherSpout;
-import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.simpleQuery;
-
-/**
- *
- */
-public class CassandraWriterBoltTest extends BaseTopologyTest {
-
-    public static final String SPOUT_MOCK = "spout-mock";
-    public static final String BOLT_WRITER = "writer";
-
-    @Test
-    @Ignore("The sleep method should be used in tests")
-    public void shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields() {
-        executeAndAssertWith(100000, new CassandraWriterBolt((getInsertInto())));
-    }
-
-    private SimpleCQLStatementMapper getInsertInto() {
-        return simpleQuery("INSERT INTO weather.temperature(weather_station_id,event_time,temperature) VALUES (?, ?, ?);")
-                .with(field("weather_station_id"), field("event_time").now(), field("temperature")).build();
-    }
-
-    protected void executeAndAssertWith(final int maxQueries, final BaseCassandraBolt bolt) {
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout(SPOUT_MOCK, new WeatherSpout("test", maxQueries), 1)
-                .setMaxTaskParallelism(1);
-
-        builder.setBolt(BOLT_WRITER, bolt, 4)
-                .shuffleGrouping(SPOUT_MOCK);
-
-        runLocalTopologyAndWait(builder);
-
-        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature WHERE weather_station_id='test'");
-        Assert.assertEquals(maxQueries, rows.all().size());
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3db96809/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
deleted file mode 100644
index 764a574..0000000
--- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/TridentTopologyTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.cassandra.trident;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import org.apache.storm.cassandra.CassandraContext;
-import org.apache.storm.cassandra.bolt.BaseTopologyTest;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-import org.apache.storm.cassandra.trident.state.CassandraQuery;
-import org.apache.storm.cassandra.trident.state.CassandraState;
-import org.apache.storm.cassandra.trident.state.CassandraStateFactory;
-import org.apache.storm.cassandra.trident.state.CassandraStateUpdater;
-import org.apache.storm.cassandra.trident.state.TridentResultSetValuesMapper;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.Random;
-
-import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
-import static org.apache.storm.cassandra.DynamicStatementBuilder.field;
-
-/**
- *
- */
-public class TridentTopologyTest extends BaseTopologyTest {
-
-    @Test
-    public void testTridentTopology() throws Exception {
-
-        Session session = cassandraCQLUnit.session;
-        String[] stationIds = {"station-1", "station-2", "station-3"};
-        for (int i = 1; i < 4; i++) {
-            ResultSet resultSet = session.execute("INSERT INTO weather.station(id, name) VALUES(?, ?)", stationIds[i-1],
-                    "Foo-Station-" + new Random().nextInt());
-        }
-
-        ResultSet rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.station");
-        for (Row row : rows) {
-            System.out.println("####### row = " + row);
-        }
-
-        WeatherBatchSpout weatherBatchSpout =
-                new WeatherBatchSpout(new Fields("weather_station_id", "temperature", "event_time"), 3,
-                        stationIds);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("cassandra-trident-stream", weatherBatchSpout);
-
-        CassandraStateFactory insertValuesStateFactory = getInsertTemperatureStateFactory();
-
-        CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
-
-        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
-        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
-        stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
-
-        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
-
-        StormTopology stormTopology = topology.build();
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("wordCounter", getConfig(), stormTopology);
-        Thread.sleep(30 * 1000);
-
-        rows = cassandraCQLUnit.session.execute("SELECT * FROM weather.temperature");
-        Assert.assertTrue(rows.iterator().hasNext()); // basic sanity check
-
-        cluster.killTopology("wordCounter");
-        cluster.shutdown();
-    }
-
-    public static class PrintFunction extends BaseFunction {
-
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            System.out.println("####### tuple = " + tuple.getValues());
-            collector.emit(tuple.getValues());
-        }
-    }
-
-    private CassandraStateFactory getInsertTemperatureStateFactory() {
-        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
-        CQLStatementTupleMapper insertTemperatureValues = boundQuery(
-                "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
-                .bind(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))
-                .build();
-        options.withCQLStatementTupleMapper(insertTemperatureValues);
-        return new CassandraStateFactory(options);
-    }
-
-    public CassandraStateFactory getSelectWeatherStationStateFactory() {
-        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
-        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
-                .bind(field("weather_station_id").as("id")).build();
-        options.withCQLStatementTupleMapper(insertTemperatureValues);
-        options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
-        return new CassandraStateFactory(options);
-    }
-}