You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Laurent Thoulon <la...@ldmobile.net> on 2014/01/03 10:27:14 UTC

hmsonline/storm-cassandra

Hi, 

We've been using Cassandra in out topologies for some time now. When we started, there was not CassandraState that suited our needs so we basically reinvented the wheel based on an old CassandraState that used Hector. 
What we implemented is for the CassandraMapState to use dynamic column family names, rowkeys and column names and the ability to use Composites. By dynamic i mean it can be fetched from the tuple. 
It works nicely but we've been seeing some performance issues when scaling and we're thinking it may be coming from hector's batch mutations. 

I'm not going to go through all our thoughts but we also decided to rebuild our topologies to make them smaller and with fewer goals so we can be able to pinpoint the bottlenecks more easely. 

Just so everything is said, we're using Trident. 

Now, we're considering using Astyanax and so we thought it may be a good idea to try and use hmsonline/storm-cassandra as it's part of storm's contrib. We've successfully implemented a basic use case but we're now facing some more complexe ones. Our main problem is that the CassandraMapState seems to restrain us to a particuliar schema for the CFs : keys beeing composites and column name, colum family and ttl are fixed in the options. Those reason are the same kind that lead us in the first place to refactor the CassandraMapState. We're actually surprised noone seems to have had the same needs and we're thinking there may be a better approach to what we want to do that we did not think of. 

We have two kinds of topologies we're building: 
- Topologies that stores counters in an opaque way in various column families (for various grainularities) using rowkeys that can be composite or not and dynamic column names (timestamps or composites made of ids and timestamps depending on the current tuple) 
- Topologies that stores in a non transactionnal way a hashmap of <column name, column values> in a rowkey depending on the tuple. 

Does anyone have the same needs ? 
Would you have any advice on how to achieve our goals in the most efficient way ? 
Should we just use our own CassandraState and move it to Astyanax ? 
We'd be glad to talk about this and share our knowledge with the community. 

If you'd like to see what we've done with our homebrewed CassandraState, i created this Gist: 
https://gist.github.com/Crystark/aca10845fb31f75e9b41 

Here's what a partitionPersist looks like: 

.partitionPersist( 
getCassandraState(), 
new Fields("timestamp", "e", "a", "c", "r", "count"), 
new CassandraMultiputUpdater(CfStats.CF, new Fields("a", "c", "r", "e"), new Fields("timestamp"), new Fields("count"), CfStats.TTL) 
) 

And what a stateQuery looks like: 

.stateQuery( 
topology.newStaticState(getCassandraState()), 
new Fields("a", "c"), 
new CassandraMapGet(CfUser.CF, new Fields("a", "c")), 
new Fields("mapWithOneResult") // config in getCassandraState sets a limit to 1 and a range on columns for CfUser.CF 
) 

Here's some versioning: 
- Java 6 
- Kafka 0.7 
- Storm 0.9.0-wip16 
- Cassandra 1.2.4 
We're considering upgrading all those to 7 / 0.8 / 0.9 / 2. 

Thanks 
Regards 
Laurent 



Re: hmsonline/storm-cassandra

Posted by Brian O'Neill <bo...@alumni.brown.edu>.
Laurent,

Great input.  I agree.

For the same reasons, we¹ve been looking at developing another
implementation of CassandraState that is less restrictive, leveraging the
CQL java-driver from Datastax.  CQL has better support for collections, as
well as lightweight transactions.  We plan to use both.
http://www.datastax.com/dev/blog/cql3_collections
http://www.datastax.com/dev/blog/lightweight-transactions-in-cassandra-2-0

If you aren¹t too far along, maybe it might make sense to use CQL (over
Astyanax)

If you are interested in collaborating, shoot me a direct email.

bone AT alumni DOT brown DOT edu

all the best,
-brian

---
Brian O'Neill
Chief Architect
Health Market Science
The Science of Better Results
2700 Horizon Drive € King of Prussia, PA € 19406
M: 215.588.6024 € @boneill42 <http://www.twitter.com/boneill42>   €
healthmarketscience.com


This information transmitted in this email message is for the intended
recipient only and may contain confidential and/or privileged material. If
you received this email in error and are not the intended recipient, or the
person responsible to deliver it to the intended recipient, please contact
the sender at the email above and delete this email and any attachments and
destroy any copies thereof. Any review, retransmission, dissemination,
copying or other use of, or taking any action in reliance upon, this
information by persons or entities other than the intended recipient is
strictly prohibited.
 


From:  Laurent Thoulon <la...@ldmobile.net>
Reply-To:  <us...@storm.incubator.apache.org>
Date:  Friday, January 3, 2014 at 4:27 AM
To:  <us...@storm.incubator.apache.org>
Subject:  hmsonline/storm-cassandra

Hi,

We've been using Cassandra in out topologies for some time now. When we
started, there was not CassandraState that suited our needs so we basically
reinvented the wheel based on an old CassandraState that used Hector.
What we implemented is for the CassandraMapState to use dynamic column
family names, rowkeys and column names and the ability to use Composites. By
dynamic i mean it can be fetched from the tuple.
It works nicely but we've been seeing some performance issues when scaling
and we're thinking it may be coming from hector's batch mutations.

I'm not going to go through all our thoughts but we also decided to rebuild
our topologies to make them smaller and with fewer goals so we can be able
to pinpoint the bottlenecks more easely.

Just so everything is said, we're using Trident.

Now, we're considering using Astyanax and so we thought it may be a good
idea to try and use hmsonline/storm-cassandra as it's part of storm's
contrib. We've successfully implemented a basic use case but we're now
facing some more complexe ones. Our main problem is that the
CassandraMapState seems to restrain us to a particuliar schema for the CFs :
keys beeing composites and column name, colum family and ttl are fixed in
the options. Those reason are the same kind that lead us in the first place
to refactor the CassandraMapState. We're actually surprised noone seems to
have had the same needs and we're thinking there may be a better approach to
what we want to do that we did not think of.

We have two kinds of topologies we're building:
- Topologies that stores counters in an opaque way in various column
families (for various grainularities) using rowkeys that can be composite or
not and dynamic column names (timestamps or composites made of ids and
timestamps depending on the current tuple)
- Topologies that stores in a non transactionnal way a hashmap of <column
name, column values> in a rowkey depending on the tuple.

Does anyone have the same needs ?
Would you have any advice on how to achieve our goals in the most efficient
way ?
Should we just use our own CassandraState and move it to Astyanax ?
We'd be glad to talk about this and share our knowledge with the community.

If you'd like to see what we've done with our homebrewed CassandraState, i
created this Gist:
https://gist.github.com/Crystark/aca10845fb31f75e9b41

Here's what a partitionPersist looks like:

            .partitionPersist(
                getCassandraState(),
                new Fields("timestamp", "e", "a", "c", "r", "count"),
                new CassandraMultiputUpdater(CfStats.CF, new Fields("a",
"c", "r", "e"), new Fields("timestamp"), new Fields("count"), CfStats.TTL)
            )

And what a stateQuery looks like:

            .stateQuery(
                topology.newStaticState(getCassandraState()),
                new Fields("a", "c"),
                new CassandraMapGet(CfUser.CF, new Fields("a", "c")),
                new Fields("mapWithOneResult") // config in
getCassandraState sets a limit to 1 and a range on columns for CfUser.CF
            )

Here's some versioning:
- Java 6
- Kafka 0.7
- Storm 0.9.0-wip16
- Cassandra 1.2.4
We're considering upgrading all those to 7 / 0.8 / 0.9 / 2.

Thanks
Regards
Laurent