You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/12/11 07:41:22 UTC

[3/6] storm git commit: STORM-1211: Added doc in README on how to use trident state/query APIs

STORM-1211: Added doc in README on how to use trident state/query APIs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5cc697b7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5cc697b7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5cc697b7

Branch: refs/heads/master
Commit: 5cc697b745fd5f0cfaf940da74d963654efca228
Parents: ef9479d
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Thu Dec 3 17:44:47 2015 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Dec 8 11:40:23 2015 +0530

----------------------------------------------------------------------
 external/storm-cassandra/README.md | 28 ++++++++++++++++++++++++++++
 external/storm-cassandra/pom.xml   |  9 +++++++++
 2 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5cc697b7/external/storm-cassandra/README.md
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/README.md b/external/storm-cassandra/README.md
index fb77425..943fe5e 100644
--- a/external/storm-cassandra/README.md
+++ b/external/storm-cassandra/README.md
@@ -178,6 +178,34 @@ builder.setBolt("BOLT_WRITER", bolt, 4)
         .customGrouping("spout", new Murmur3StreamGrouping("title"))
 ```
 
+### Trident API support
+storm-cassandra support Trident `state` API for `inserting` data into Cassandra. 
+```java
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery(
+                "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)")
+                .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        CassandraStateFactory insertValuesStateFactory =  new CassandraStateFactory(options);
+        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));
+        stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x"));
+        stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());
+```
+
+Below `state` API for `querying` data from Cassandra.
+```java
+        CassandraState.Options options = new CassandraState.Options(new CassandraContext());
+        CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?")
+                 .bind(with(field("weather_station_id").as("id")));
+        options.withCQLStatementTupleMapper(insertTemperatureValues);
+        options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name")));
+        CassandraStateFactory selectWeatherStationStateFactory =  new CassandraStateFactory(options);
+        CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory();
+        TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory);
+        stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));         
+```
+
 ## License
 
 Licensed to the Apache Software Foundation (ASF) under one

http://git-wip-us.apache.org/repos/asf/storm/blob/5cc697b7/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index 86cd87d..446b18b 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -51,6 +51,15 @@
                 <role>developer</role>
             </roles>
         </developer>
+        <developer>
+            <id>satishd</id>
+            <name>Satish Duggana</name>
+            <email>satish.duggana@gmail.com</email>
+            <url>https://github.com/satishd</url>
+            <roles>
+                <role>developer</role>
+            </roles>
+        </developer>
     </developers>
 
     <build>