You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Clint Kelly <cl...@gmail.com> on 2015/03/02 20:47:25 UTC

best practices for time-series data with massive amounts of records

Hi all,

I am designing an application that will capture time series data where we
expect the number of records per user to potentially be extremely high.  I
am not sure if we will eclipse the max row size of 2B elements, but I
assume that we would not want our application to approach that size anyway.

If we wanted to put all of the interactions in a single row, then I would
make a data model that looks like:

CREATE TABLE events (
  id text,
  event_time timestamp,
  event blob,
  PRIMARY KEY (id, event_time))
WITH CLUSTERING ORDER BY (event_time DESC);

The best practice for breaking up large rows of time series data is, as I
understand it, to put part of the time into the partitioning key (
http://planetcassandra.org/getting-started-with-time-series-data-modeling/):

CREATE TABLE events (
  id text,
  date text, // Could also use year+month here or year+week or something
else
  event_time timestamp,
  event blob,
  PRIMARY KEY ((id, date), event_time))
WITH CLUSTERING ORDER BY (event_time DESC);

The downside of this approach is that we can no longer do a simple
continuous scan to get all of the events for a given user.  Some users may
log lots and lots of interactions every day, while others may interact with
our application infrequently, so I'd like a quick way to get the most
recent interaction for a given user.

Has anyone used different approaches for this problem?

The only thing I can think of is to use the second table schema described
above, but switch to an order-preserving hashing function, and then
manually hash the "id" field.  This is essentially what we would do in
HBase.

Curious if anyone else has any thoughts.

Best regards,
Clint

Re: best practices for time-series data with massive amounts of records

Posted by Jack Krupansky <ja...@gmail.com>.
I'd recommend using 100K and 10M as rough guidelines for the maximum number
of rows and bytes in a single partition. Sure, Cassandra can technically
handle a lot more than that, but very large partitions can make your life
more difficult. Of course you will have to do a POC to validate the sweet
spot for your particular app, data model, actual data values, hardware, app
access patterns, and app latency requirements. It may be that your actual
numbers should be half or twice my guidance, but they are a starting point.

Back to your starting point: You really need to characterize the number of
"records" per user. For example, will you have a large number of users with
few records? IOW, what are the expected distributions for user count and
record per user count. Give some specific numbers. Even if you don't know
what the real numbers will be, you have to at least have a model for counts
before modeling the partition keys.

-- Jack Krupansky

On Mon, Mar 2, 2015 at 2:47 PM, Clint Kelly <cl...@gmail.com> wrote:

> Hi all,
>
> I am designing an application that will capture time series data where we
> expect the number of records per user to potentially be extremely high.  I
> am not sure if we will eclipse the max row size of 2B elements, but I
> assume that we would not want our application to approach that size anyway.
>
> If we wanted to put all of the interactions in a single row, then I would
> make a data model that looks like:
>
> CREATE TABLE events (
>   id text,
>   event_time timestamp,
>   event blob,
>   PRIMARY KEY (id, event_time))
> WITH CLUSTERING ORDER BY (event_time DESC);
>
> The best practice for breaking up large rows of time series data is, as I
> understand it, to put part of the time into the partitioning key (
> http://planetcassandra.org/getting-started-with-time-series-data-modeling/
> ):
>
> CREATE TABLE events (
>   id text,
>   date text, // Could also use year+month here or year+week or something
> else
>   event_time timestamp,
>   event blob,
>   PRIMARY KEY ((id, date), event_time))
> WITH CLUSTERING ORDER BY (event_time DESC);
>
> The downside of this approach is that we can no longer do a simple
> continuous scan to get all of the events for a given user.  Some users may
> log lots and lots of interactions every day, while others may interact with
> our application infrequently, so I'd like a quick way to get the most
> recent interaction for a given user.
>
> Has anyone used different approaches for this problem?
>
> The only thing I can think of is to use the second table schema described
> above, but switch to an order-preserving hashing function, and then
> manually hash the "id" field.  This is essentially what we would do in
> HBase.
>
> Curious if anyone else has any thoughts.
>
> Best regards,
> Clint
>
>
>

Re: best practices for time-series data with massive amounts of records

Posted by Jens Rantil <je...@tink.se>.
Hi,

I have not done something similar, however I have some comments:

On Mon, Mar 2, 2015 at 8:47 PM, Clint Kelly <cl...@gmail.com> wrote:

> The downside of this approach is that we can no longer do a simple
> continuous scan to get all of the events for a given user.
>

Sure, but would you really do that real time anyway? :) If you have
billions of events that's not going to scale anyway. Also, if you have
100000 events per bucket. The latency introduced by batching should be
manageable.


> Some users may log lots and lots of interactions every day, while others
> may interact with our application infrequently,
>

This makes another reason to split them up into bucket to make the cluster
partitions more manageble and homogenous.


> so I'd like a quick way to get the most recent interaction for a given
> user.
>

For this you could actually have a second table that stores the
last_time_bucket for a user. Upon event write, you could simply do an
update of the last_time_bucket. You could even have an index of all time
buckets per user if you want.


> Has anyone used different approaches for this problem?
>
> The only thing I can think of is to use the second table schema described
> above, but switch to an order-preserving hashing function, and then
> manually hash the "id" field.  This is essentially what we would do in
> HBase.
>

Like you might already know, this order preserving hashing is _not_
considered best practise in the Cassandra world.

Cheers,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.rantil@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook <https://www.facebook.com/#!/tink.se> Linkedin
<http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
 Twitter <https://twitter.com/tink>

Re: best practices for time-series data with massive amounts of records

Posted by Yulian Oifa <oi...@gmail.com>.
Hello
You can use timeuuid as raw key and create sepate CF to be used for indexing
Indexing CF may be either with user_id as key , or a better approach is to
partition row by timestamp.
In case of partition you can create compound key , in which you will store
user_id and timestamp base ( for example if you would like to keep 8 of 13
digits in timestamp , then new row will be created each 100000 seconds -
approximately each day , a bit more and maximum number of rows per user
would be 100K , of course you can play with number of rows/ time for each
row depending on number of records you are receiving. i am creating new row
each 11 days , so its 35 rows per year , per user ) )
In each column you can store timeuuid as name and empty value.

This way you keep you data ordered by time. The only disadvantage of this
approach is that you have to "glue" your data when you finished reading one
index row and started another one ( both asc and desc ).

When reading data you should first get slice depending on your needs from
index , and then get multi_range from original CF based on slice received.
Hope it helps
Best regards
Yulian Oifa



On Mon, Mar 2, 2015 at 9:47 PM, Clint Kelly <cl...@gmail.com> wrote:

> Hi all,
>
> I am designing an application that will capture time series data where we
> expect the number of records per user to potentially be extremely high.  I
> am not sure if we will eclipse the max row size of 2B elements, but I
> assume that we would not want our application to approach that size anyway.
>
> If we wanted to put all of the interactions in a single row, then I would
> make a data model that looks like:
>
> CREATE TABLE events (
>   id text,
>   event_time timestamp,
>   event blob,
>   PRIMARY KEY (id, event_time))
> WITH CLUSTERING ORDER BY (event_time DESC);
>
> The best practice for breaking up large rows of time series data is, as I
> understand it, to put part of the time into the partitioning key (
> http://planetcassandra.org/getting-started-with-time-series-data-modeling/
> ):
>
> CREATE TABLE events (
>   id text,
>   date text, // Could also use year+month here or year+week or something
> else
>   event_time timestamp,
>   event blob,
>   PRIMARY KEY ((id, date), event_time))
> WITH CLUSTERING ORDER BY (event_time DESC);
>
> The downside of this approach is that we can no longer do a simple
> continuous scan to get all of the events for a given user.  Some users may
> log lots and lots of interactions every day, while others may interact with
> our application infrequently, so I'd like a quick way to get the most
> recent interaction for a given user.
>
> Has anyone used different approaches for this problem?
>
> The only thing I can think of is to use the second table schema described
> above, but switch to an order-preserving hashing function, and then
> manually hash the "id" field.  This is essentially what we would do in
> HBase.
>
> Curious if anyone else has any thoughts.
>
> Best regards,
> Clint
>
>
>

Re: best practices for time-series data with massive amounts of records

Posted by Eric Stevens <mi...@gmail.com>.
It's probably quite rare for extremely large time series data to be
querying the whole set of data.  Instead there's almost always a "Between X
and Y dates" aspect to nearly every real time query you might have against
a table like this (with the exception of "most recent N events").

Because of this, time bucketing can be an effective strategy, though until
you understand your data better, it's hard to know how large (or small) to
make your buckets.  Because of *that*, I recommend using timestamp data
type for your bucketing strategy - this gives you the advantage of being
able to reduce your bucket sizes while keeping your at-rest data mostly
still quite accessible.

What I mean is that if you change your bucketing strategy from day to hour,
when you are querying across that changed time period, you can iterate at
the finer granularity buckets (hour), and you'll pick up the coarser
granularity (day) automatically for all but the earliest bucket (which is
easy to correct for when you're flooring your start bucket).  In the
coarser time period, most reads are partition key misses, which are
extremely inexpensive in Cassandra.

If you do need most-recent-N queries for broad ranges and you expect to
have some users whose clickrate is dramatically less frequent than your
bucket interval (making iterating over buckets inefficient), you can keep a
separate counter table with PK of ((user_id), bucket) in which you count
new events.  Now you can identify the exact set of buckets you need to read
to satisfy the query no matter what the user's click volume is (so very low
volume users have at most N partition keys queried, higher volume users
query fewer partition keys).

On Fri, Mar 6, 2015 at 4:06 PM, graham sanderson <gr...@vast.com> wrote:

> Note that using static column(s) for the “head” value, and trailing TTLed
> values behind is something we’re considering. Note this is especially nice
> if your head state includes say a map which is updated by small deltas
> (individual keys)
>
> We have not yet studied the effect of static columns on say DTCS
>
>
> On Mar 6, 2015, at 4:42 PM, Clint Kelly <cl...@gmail.com> wrote:
>
> Hi all,
>
> Thanks for the responses, this was very helpful.
>
> I don't know yet what the distribution of clicks and users will be, but I
> expect to see a few users with an enormous amount of interactions and most
> users having very few.  The idea of doing some additional manual
> partitioning, and then maintaining another table that contains the "head"
> partition for each user makes sense, although it would add additional
> latency when we want to get say the most recent 1000 interactions for a
> given user (which is something that we have to do sometimes for
> applications with tight SLAs).
>
> FWIW I doubt that any users will have so many interactions that they
> exceed what we could reasonably put in a row, but I wanted to have a
> strategy to deal with this.
>
> Having a nice design pattern in Cassandra for maintaining a row with the
> N-most-recent interactions would also solve this reasonably well, but I
> don't know of any way to implement that without running batch jobs that
> periodically clean out data (which might be okay).
>
> Best regards,
> Clint
>
>
>
>
> On Tue, Mar 3, 2015 at 8:10 AM, mck <mc...@apache.org> wrote:
>
>>
>> > Here "partition" is a random digit from 0 to (N*M)
>> > where N=nodes in cluster, and M=arbitrary number.
>>
>>
>> Hopefully it was obvious, but here (unless you've got hot partitions),
>> you don't need N.
>> ~mck
>>
>
>
>

Re: best practices for time-series data with massive amounts of records

Posted by graham sanderson <gr...@vast.com>.
Note that using static column(s) for the “head” value, and trailing TTLed values behind is something we’re considering. Note this is especially nice if your head state includes say a map which is updated by small deltas (individual keys)

We have not yet studied the effect of static columns on say DTCS

> On Mar 6, 2015, at 4:42 PM, Clint Kelly <cl...@gmail.com> wrote:
> 
> Hi all,
> 
> Thanks for the responses, this was very helpful.
> 
> I don't know yet what the distribution of clicks and users will be, but I expect to see a few users with an enormous amount of interactions and most users having very few.  The idea of doing some additional manual partitioning, and then maintaining another table that contains the "head" partition for each user makes sense, although it would add additional latency when we want to get say the most recent 1000 interactions for a given user (which is something that we have to do sometimes for applications with tight SLAs).
> 
> FWIW I doubt that any users will have so many interactions that they exceed what we could reasonably put in a row, but I wanted to have a strategy to deal with this.
> 
> Having a nice design pattern in Cassandra for maintaining a row with the N-most-recent interactions would also solve this reasonably well, but I don't know of any way to implement that without running batch jobs that periodically clean out data (which might be okay).
> 
> Best regards,
> Clint
> 
> 
> 
> 
> On Tue, Mar 3, 2015 at 8:10 AM, mck <mck@apache.org <ma...@apache.org>> wrote:
> 
> > Here "partition" is a random digit from 0 to (N*M)
> > where N=nodes in cluster, and M=arbitrary number.
> 
> 
> Hopefully it was obvious, but here (unless you've got hot partitions),
> you don't need N.
> ~mck
> 


Re: best practices for time-series data with massive amounts of records

Posted by Clint Kelly <cl...@gmail.com>.
Hi all,

Thanks for the responses, this was very helpful.

I don't know yet what the distribution of clicks and users will be, but I
expect to see a few users with an enormous amount of interactions and most
users having very few.  The idea of doing some additional manual
partitioning, and then maintaining another table that contains the "head"
partition for each user makes sense, although it would add additional
latency when we want to get say the most recent 1000 interactions for a
given user (which is something that we have to do sometimes for
applications with tight SLAs).

FWIW I doubt that any users will have so many interactions that they exceed
what we could reasonably put in a row, but I wanted to have a strategy to
deal with this.

Having a nice design pattern in Cassandra for maintaining a row with the
N-most-recent interactions would also solve this reasonably well, but I
don't know of any way to implement that without running batch jobs that
periodically clean out data (which might be okay).

Best regards,
Clint




On Tue, Mar 3, 2015 at 8:10 AM, mck <mc...@apache.org> wrote:

>
> > Here "partition" is a random digit from 0 to (N*M)
> > where N=nodes in cluster, and M=arbitrary number.
>
>
> Hopefully it was obvious, but here (unless you've got hot partitions),
> you don't need N.
> ~mck
>

Re: best practices for time-series data with massive amounts of records

Posted by mck <mc...@apache.org>.
> Here "partition" is a random digit from 0 to (N*M) 
> where N=nodes in cluster, and M=arbitrary number.


Hopefully it was obvious, but here (unless you've got hot partitions),
you don't need N.
~mck

Re: best practices for time-series data with massive amounts of records

Posted by mck <mc...@apache.org>.
Clint,

> CREATE TABLE events (
>   id text,
>   date text, // Could also use year+month here or year+week or something else
>   event_time timestamp,
>   event blob,
>   PRIMARY KEY ((id, date), event_time))
> WITH CLUSTERING ORDER BY (event_time DESC);
> 
> The downside of this approach is that we can no longer do a simple
> continuous scan to get all of the events for a given user.  Some users
> may log lots and lots of interactions every day, while others may interact
> with our application infrequently, so I'd like a quick way to get the most
> recent interaction for a given user.
> 
> Has anyone used different approaches for this problem?


One idea is to provide additional manual partitioning like…

CREATE TABLE events (
  user_id text,
  partition int,
  event_time timeuuid,
  event_json text,
  PRIMARY KEY ((user_id, partition), event_time)
) WITH
  CLUSTERING ORDER BY (event_time DESC) AND
  compaction={'class': 'DateTieredCompactionStrategy'};


Here "partition" is a random digit from 0 to (N*M) 
where N=nodes in cluster, and M=arbitrary number.

Read performance is going to suffer a little because you need to query
N*M as many partition keys for each read, but should be constant enough
that it comes down to increasing the cluster's hardware and scaling out
as need be.

The multikey reads you can do it with a SELECT…IN query, or better yet
with parallel reads (less pressure on the coordinator at expense of 
extra network calls).

Starting with M=1, you have the option to increase it over time if the
rows in partitions for any users get too high.
(We do¹ something similar for storing all raw events in our enterprise
platform, but because the data is not user-centric the initial partition
key is minute-by-minute timebuckets, and M has remained at 1 the whole
time).

This approach is better than using order-preserving partition (really
don't do that).

I would also consider replacing "event blob" with "event text", choosing
json instead of any binary serialisation. We've learnt the hard way the
value of data transparency, and i'm guessing the storage cost is small
given c* compression.

Otherwise the advice here is largely repeating what Jens has already
said.

~mck

  ¹ slide 19+20 from
  https://prezi.com/vt98oob9fvo4/cassandra-summit-cassandra-and-hadoop-at-finnno/