You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Nathan Bijnens <na...@nathan.gs> on 2015/06/26 18:19:05 UTC

Slow reads on C* 2.0.15 using Spark Cassandra

We are using the Spark Cassandra driver, version 1.2.0 (Spark 1.2.1)
connecting to a 6 node bare metal (16gb ram, Xeon E3-1270 (8core), 4x 7,2k
SATA disks) Cassandra cluster. Spark runs on a separate Mesos cluster.

We are running a transformation job, where we read the complete contents of
a table into Spark, do some transformations and write them back to C*. We
are using Spark to do a data migration in C*.

Before we execute, the load on Cassandra is very little.

We notice incredibly slow reads, 600mb in an hour, we are using quorum
LOCAL_ONE reads.
The load_one of Cassandra increases from <1 to 60! There is no CPU wait,
only user & nice.

The table & cassandra.yaml:
https://gist.github.com/nathan-gs/908a48aed8a0eb3c3183

Anyone any idea?

Thanks,
  Nathan

Re: Slow reads on C* 2.0.15 using Spark Cassandra

Posted by Nathan Bijnens <na...@nathan.gs>.
One more update, it looks like the driver is generating this CQL statements:

SELECT
 "test_id", "channel", "ts", "event", "groups" FROM "KEYSPACE"."test" WHERE
token("test_id") > ? AND token("test_id") <= ?   ALLOW FILTERING;

Best regards,
  Nathan

On Fri, Jun 26, 2015 at 8:16 PM Nathan Bijnens <na...@nathan.gs> wrote:

> Thanks for the suggestion, will take a look.
>
> Our code looks like this:
>
> val rdd = sc.cassandraTable[EventV0](keyspace, "test")
>
> val transformed = rdd.map{e => EventV1(e.testId, e.ts, e.channel, e.groups, e.event)}
> transformed.saveToCassandra(keyspace, "test_v1")
>
> Not sure if this code might translate to limits.
>
> The total date in this table is +/- 2gb on disk, total data for each node
> is around 290gb.
>
> On Fri, Jun 26, 2015 at 7:01 PM Nate McCall <na...@thelastpickle.com>
> wrote:
>
>> > We notice incredibly slow reads, 600mb in an hour, we are using quorum
>> LOCAL_ONE reads.
>> > The load_one of Cassandra increases from <1 to 60! There is no CPU
>> wait, only user & nice.
>>
>> Without seeing the code and query, it's hard to tell, but I noticed
>> something similar when we had a client incorrectly using the 'take' method
>> for a result count like so:
>> val resultCount = query.take(count).length
>>
>> 'take' can call limit under the hood. The docs for the latter are
>> interesting:
>> "The limit will be applied for each created Spark partition. In other
>> words, unless the data are fetched from a single Cassandra partition the
>> number of results is unpredictable." [0]
>>
>> Removing that line (it wasnt necessary for the use case) and just relying
>> on a simple 'myRDD.select("my_col")).toArray.foreach" got performance back
>> to where it should be. Per the docs, limit (and therefore take) works fine
>> as long as the partition key is used as a predicate in the where clause
>> ("WHERE test_id = somevalue" in your example).
>>
>> [0]
>> https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L92-L101
>>
>> --
>> -----------------
>> Nate McCall
>> Austin, TX
>> @zznate
>>
>> Co-Founder & Sr. Technical Consultant
>> Apache Cassandra Consulting
>> http://www.thelastpickle.com
>>
>

Re: Slow reads on C* 2.0.15 using Spark Cassandra

Posted by Nathan Bijnens <na...@nathan.gs>.
Thanks for the suggestion, will take a look.

Our code looks like this:

val rdd = sc.cassandraTable[EventV0](keyspace, "test")

val transformed = rdd.map{e => EventV1(e.testId, e.ts, e.channel,
e.groups, e.event)}
transformed.saveToCassandra(keyspace, "test_v1")

Not sure if this code might translate to limits.

The total date in this table is +/- 2gb on disk, total data for each node
is around 290gb.

On Fri, Jun 26, 2015 at 7:01 PM Nate McCall <na...@thelastpickle.com> wrote:

> > We notice incredibly slow reads, 600mb in an hour, we are using quorum
> LOCAL_ONE reads.
> > The load_one of Cassandra increases from <1 to 60! There is no CPU wait,
> only user & nice.
>
> Without seeing the code and query, it's hard to tell, but I noticed
> something similar when we had a client incorrectly using the 'take' method
> for a result count like so:
> val resultCount = query.take(count).length
>
> 'take' can call limit under the hood. The docs for the latter are
> interesting:
> "The limit will be applied for each created Spark partition. In other
> words, unless the data are fetched from a single Cassandra partition the
> number of results is unpredictable." [0]
>
> Removing that line (it wasnt necessary for the use case) and just relying
> on a simple 'myRDD.select("my_col")).toArray.foreach" got performance back
> to where it should be. Per the docs, limit (and therefore take) works fine
> as long as the partition key is used as a predicate in the where clause
> ("WHERE test_id = somevalue" in your example).
>
> [0]
> https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L92-L101
>
> --
> -----------------
> Nate McCall
> Austin, TX
> @zznate
>
> Co-Founder & Sr. Technical Consultant
> Apache Cassandra Consulting
> http://www.thelastpickle.com
>

Re: Slow reads on C* 2.0.15 using Spark Cassandra

Posted by Nate McCall <na...@thelastpickle.com>.
> We notice incredibly slow reads, 600mb in an hour, we are using quorum
LOCAL_ONE reads.
> The load_one of Cassandra increases from <1 to 60! There is no CPU wait,
only user & nice.

Without seeing the code and query, it's hard to tell, but I noticed
something similar when we had a client incorrectly using the 'take' method
for a result count like so:
val resultCount = query.take(count).length

'take' can call limit under the hood. The docs for the latter are
interesting:
"The limit will be applied for each created Spark partition. In other
words, unless the data are fetched from a single Cassandra partition the
number of results is unpredictable." [0]

Removing that line (it wasnt necessary for the use case) and just relying
on a simple 'myRDD.select("my_col")).toArray.foreach" got performance back
to where it should be. Per the docs, limit (and therefore take) works fine
as long as the partition key is used as a predicate in the where clause
("WHERE test_id = somevalue" in your example).

[0]
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraRDD.scala#L92-L101

--
-----------------
Nate McCall
Austin, TX
@zznate

Co-Founder & Sr. Technical Consultant
Apache Cassandra Consulting
http://www.thelastpickle.com