You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Alex Kotelnikov <al...@diginetica.com> on 2017/08/16 16:50:48 UTC

Full table scan with cassandra

Hey,

we are trying Cassandra as an alternative for storage huge stream of data
coming from our customers.

Storing works quite fine, and I started to validate how retrieval does. We
have two types of that: fetching specific records and bulk retrieval for
general analysis.
Fetching single record works like charm. But it is not so with bulk fetch.

With a moderately small table of ~2 million records, ~10Gb raw data I
observed very slow operation (using token(partition key) ranges). It takes
minutes to perform full retrieval. We tried a couple of configurations
using virtual machines, real hardware and overall looks like it is not
possible to all table data in a reasonable time (by reasonable I mean that
since we have 1Gbit network 10Gb can be transferred in a couple of minutes
from one server to another and when we have 10+ cassandra servers and 10+
spark executors total time should be even smaller).

I tried datastax spark connector. Also I wrote a simple test case using
datastax java driver and see how fetch of 10k records takes ~10s so I
assume that "sequential" scan will take 200x more time, equals ~30 minutes.

May be we are totally wrong trying to use Cassandra this way?

-- 

Best Regards,


*Alexander Kotelnikov*

*Team Lead*

DIGINETICA
Retail Technology Company

m: +7.921.915.06.28

*www.diginetica.com <http://www.diginetica.com/>*

Re: Full table scan with cassandra

Posted by Ben Bromhead <be...@instaclustr.com>.
Apache Cassandra is not great in terms of performance at the moment for
batch analytics workloads that require a full table scan. I would look at
FiloDB for all the benefits and familiarity of Cassandra with better
streaming and analytics performance: https://github.com/filodb/FiloDB

There are also some outstanding tickets around improving bulk reads in
Cassandra (see https://issues.apache.org/jira/browse/CASSANDRA-9259 for the
full gory details), but it appears to be abandonded by the initial set of
contributors.

On Wed, 16 Aug 2017 at 09:51 Alex Kotelnikov <al...@diginetica.com>
wrote:

> Hey,
>
> we are trying Cassandra as an alternative for storage huge stream of data
> coming from our customers.
>
> Storing works quite fine, and I started to validate how retrieval does. We
> have two types of that: fetching specific records and bulk retrieval for
> general analysis.
> Fetching single record works like charm. But it is not so with bulk fetch.
>
> With a moderately small table of ~2 million records, ~10Gb raw data I
> observed very slow operation (using token(partition key) ranges). It takes
> minutes to perform full retrieval. We tried a couple of configurations
> using virtual machines, real hardware and overall looks like it is not
> possible to all table data in a reasonable time (by reasonable I mean that
> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
> from one server to another and when we have 10+ cassandra servers and 10+
> spark executors total time should be even smaller).
>
> I tried datastax spark connector. Also I wrote a simple test case using
> datastax java driver and see how fetch of 10k records takes ~10s so I
> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>
> May be we are totally wrong trying to use Cassandra this way?
>
> --
>
> Best Regards,
>
>
> *Alexander Kotelnikov*
>
> *Team Lead*
>
> DIGINETICA
> Retail Technology Company
>
> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>
> *www.diginetica.com <http://www.diginetica.com/>*
>
-- 
Ben Bromhead
CTO | Instaclustr <https://www.instaclustr.com/>
+1 650 284 9692
Managed Cassandra / Spark on AWS, Azure and Softlayer

Re: Full table scan with cassandra

Posted by Dmitry Saprykin <sa...@gmail.com>.
Hi Alex,

How do you generate you subrange set for running queries?
It may happen that some of your ranges intersect data ownership range
borders (check it running 'nodetool describering [keyspace_name]')
Those range queries will be highly ineffective in that case and that could
explain your results.

Also you can think using LOCAL_ONE consistency for your full scans. You may
lose some consistency but will gain a log of performance improvements.

Kind regards,
Dmitry Saprykin

On Thu, Aug 17, 2017 at 12:36 PM, Alex Kotelnikov <
alex.kotelnikov@diginetica.com> wrote:

> Dor,
>
> I believe, I tried it in many ways and the result is quite disappointing.
> I've run my scans on 3 different clusters, one of which was using on VMs
> and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to see,
> how this affects the performance.
>
> I also generated the flow from spark cluster ranging from 4 to 40 parallel
> tasks as well as just multi-threaded client.
>
> The surprise is that trivial fetch of all records using token ranges takes
> pretty much the same time in all setups.
>
> The only beneficial thing I've learned is that it is much more efficient
> to create a MATERIALIZED VIEW than to filter (even using secondary index).
>
> Say, I have a typical dataset, around 3Gb of data, 1M records. And I have
> a trivial scan practice:
>
> String.format("SELECT token(user_id), user_id, events FROM user_events
> WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND
> token(user_id) < %d ", end) : "")
>
> I split all tokens into start-end ranges (except for last range, which
> only has start) and query ranges in multiple threads, up to 40.
>
> Whole process takes ~40s on 3 VMs cluster  2+2+4 cores, 16Gb RAM each 1
> virtual disk. And it takes ~30s on real hardware clusters
> 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much
> at all. Util it is too high or too low.
> Size of tokens range matters, but here I see the rule "make it larger, but
> avoid cassandra timeouts".
> I also tried spark connector to validate that my test multithreaded app is
> not the bottleneck. It is not.
>
> I expected some kind of elasticity, I see none. Feels like I do something
> wrong...
>
>
>
> On 17 August 2017 at 00:19, Dor Laor <do...@scylladb.com> wrote:
>
>> Hi Alex,
>>
>> You probably didn't get the paralelism right. Serial scan has
>> a paralelism of one. If the paralelism isn't large enough, perf will be
>> slow.
>> If paralelism is too large, Cassandra and the disk will trash and have too
>> many context switches.
>>
>> So you need to find your cluster's sweet spot. We documented the procedure
>> to do it in this blog: http://www.scylladb.com/
>> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/
>> and the results are here: http://www.scylladb.com/
>> 2017/03/28/parallel-efficient-full-table-scan-scylla/
>> The algorithm should translate to Cassandra but you'll have to use
>> different rules of the thumb.
>>
>> Best,
>> Dor
>>
>>
>> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
>> alex.kotelnikov@diginetica.com> wrote:
>>
>>> Hey,
>>>
>>> we are trying Cassandra as an alternative for storage huge stream of
>>> data coming from our customers.
>>>
>>> Storing works quite fine, and I started to validate how retrieval does.
>>> We have two types of that: fetching specific records and bulk retrieval for
>>> general analysis.
>>> Fetching single record works like charm. But it is not so with bulk
>>> fetch.
>>>
>>> With a moderately small table of ~2 million records, ~10Gb raw data I
>>> observed very slow operation (using token(partition key) ranges). It takes
>>> minutes to perform full retrieval. We tried a couple of configurations
>>> using virtual machines, real hardware and overall looks like it is not
>>> possible to all table data in a reasonable time (by reasonable I mean that
>>> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
>>> from one server to another and when we have 10+ cassandra servers and 10+
>>> spark executors total time should be even smaller).
>>>
>>> I tried datastax spark connector. Also I wrote a simple test case using
>>> datastax java driver and see how fetch of 10k records takes ~10s so I
>>> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>>>
>>> May be we are totally wrong trying to use Cassandra this way?
>>>
>>> --
>>>
>>> Best Regards,
>>>
>>>
>>> *Alexander Kotelnikov*
>>>
>>> *Team Lead*
>>>
>>> DIGINETICA
>>> Retail Technology Company
>>>
>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>
>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>
>>
>>
>
>
> --
>
> Best Regards,
>
>
> *Alexander Kotelnikov*
>
> *Team Lead*
>
> DIGINETICA
> Retail Technology Company
>
> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>
> *www.diginetica.com <http://www.diginetica.com/>*
>

Re: Full table scan with cassandra

Posted by Alex Kotelnikov <al...@diginetica.com>.
So it is also terribly slow.

Does not work with materialized views, quick hack about that below and UDT,
this requires more time to fix.

So I used it to retrieve the only built-in type column, the key. To make
the task more time-consuming I exteneded the dataset a bit, to ~2.5M
records.

All of my multithreaded dumper, my spark job using connector and
cassandra-unload retrieve 2.5M records in ~1m20s. Increased concurrency
halves this time. But it works only with this trivial case fetching key
only, no actual payload.
Workload of fetching actual user data for this 2.5M is ~ 2m20s and it does
not go down with number of threads or spark cores.

I feel stupid. More than two minutes to transfer ~10Gb in a highly
distributed system.


index c570574..e7af28b 100644
--- a/src/main/java/com/datastax/loader/CqlDelimParser.java
+++ b/src/main/java/com/datastax/loader/CqlDelimParser.java
@@ -20,6 +20,7 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.AbstractTableMetadata;
 import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.exceptions.InvalidTypeException;
 import com.datastax.loader.parser.BigDecimalParser;
@@ -205,9 +206,10 @@ public class CqlDelimParser {
             System.err.println("Keyspace " + keyspace + " not found.");
             System.exit(-1);
         }
-        TableMetadata tm = km.getTable(tablename);
+        AbstractTableMetadata tm = km.getTable(tablename);
+        if ( tm == null ) tm = km.getMaterializedView(tablename);
         if (null == tm) {
-            System.err.println("Table " + tablename + " not found.");
+            System.err.println("Table/view " + tablename + " not found.");
             System.exit(-1);
         }
         List<String> inList = new ArrayList<String>();
diff --git a/src/main/java/com/datastax/loader/CqlDelimUnload.java
b/src/main/java/com/datastax/loader/CqlDelimUnload.java
index 472e33b..f084ce8 100644
--- a/src/main/java/com/datastax/loader/CqlDelimUnload.java
+++ b/src/main/java/com/datastax/loader/CqlDelimUnload.java
@@ -56,6 +56,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManagerFactory;

+import com.datastax.driver.core.AbstractTableMetadata;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ColumnMetadata;
@@ -547,9 +548,12 @@ public class CqlDelimUnload {
                 table = table.replaceAll("\"", "");
             else
                 table = table.toLowerCase();
-
-            List<ColumnMetadata> lcm = session.getCluster().getMetadata()
-                .getKeyspace(keyspace).getTable(table).getPartitionKey();
+
+            AbstractTableMetadata tm = session.getCluster().getMetadata()
+                .getKeyspace(keyspace).getTable(table);
+            if ( tm == null )
+              tm = session.getCluster().getMetada
ta().getKeyspace(keyspace).getMaterializedView(table);
+            List<ColumnMetadata> lcm = tm.getPartitionKey();
             String partitionKey = lcm.get(0).getName();
             for (int i = 1; i < lcm.size(); i++) {
                 partitionKey = partitionKey + "," + lcm.get(i).getName();


On 17 August 2017 at 21:11, Jeff Jirsa <jj...@gmail.com> wrote:

> Brian Hess has perhaps the best open source code example of the right way
> to do this:
>
> https://github.com/brianmhess/cassandra-loader/blob/master/
> src/main/java/com/datastax/loader/CqlDelimUnload.java
>
>
>
> On Thu, Aug 17, 2017 at 10:00 AM, Alex Kotelnikov <
> alex.kotelnikov@diginetica.com> wrote:
>
>> yup, user_id is the primary key.
>>
>> First of all,can you share, how to "go to a node directly"?.
>>
>> Also such approach will retrieve all the data RF times, coordinator
>> should have enough metadata to avoid that.
>>
>> Should not requesting multiple coordinators provide certain concurrency?
>>
>> On 17 August 2017 at 19:54, Dor Laor <do...@scylladb.com> wrote:
>>
>>> On Thu, Aug 17, 2017 at 9:36 AM, Alex Kotelnikov <
>>> alex.kotelnikov@diginetica.com> wrote:
>>>
>>>> Dor,
>>>>
>>>> I believe, I tried it in many ways and the result is quite
>>>> disappointing.
>>>> I've run my scans on 3 different clusters, one of which was using on
>>>> VMs and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to
>>>> see, how this affects the performance.
>>>>
>>>> I also generated the flow from spark cluster ranging from 4 to 40
>>>> parallel tasks as well as just multi-threaded client.
>>>>
>>>> The surprise is that trivial fetch of all records using token ranges
>>>> takes pretty much the same time in all setups.
>>>>
>>>> The only beneficial thing I've learned is that it is much more
>>>> efficient to create a MATERIALIZED VIEW than to filter (even using
>>>> secondary index).
>>>>
>>>> Say, I have a typical dataset, around 3Gb of data, 1M records. And I
>>>> have a trivial scan practice:
>>>>
>>>> String.format("SELECT token(user_id), user_id, events FROM user_events
>>>> WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND
>>>> token(user_id) < %d ", end) : "")
>>>>
>>>
>>> Is user_id the primary key? Looks like this query will just go to the
>>> cluster and access a random coordinator each time.
>>> C* doesn't save the subsequent token on the same node. It's hashed.
>>> The idea of parallel cluster scan is to go directly to all nodes in
>>> parallel and query them for the hashed keys they own.
>>>
>>>
>>>> I split all tokens into start-end ranges (except for last range, which
>>>> only has start) and query ranges in multiple threads, up to 40.
>>>>
>>>> Whole process takes ~40s on 3 VMs cluster  2+2+4 cores, 16Gb RAM each 1
>>>> virtual disk. And it takes ~30s on real hardware clusters
>>>> 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much
>>>> at all. Util it is too high or too low.
>>>> Size of tokens range matters, but here I see the rule "make it larger,
>>>> but avoid cassandra timeouts".
>>>> I also tried spark connector to validate that my test multithreaded app
>>>> is not the bottleneck. It is not.
>>>>
>>>> I expected some kind of elasticity, I see none. Feels like I do
>>>> something wrong...
>>>>
>>>>
>>>>
>>>> On 17 August 2017 at 00:19, Dor Laor <do...@scylladb.com> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> You probably didn't get the paralelism right. Serial scan has
>>>>> a paralelism of one. If the paralelism isn't large enough, perf will
>>>>> be slow.
>>>>> If paralelism is too large, Cassandra and the disk will trash and have
>>>>> too
>>>>> many context switches.
>>>>>
>>>>> So you need to find your cluster's sweet spot. We documented the
>>>>> procedure
>>>>> to do it in this blog: http://www.scylladb.com/
>>>>> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/
>>>>> and the results are here: http://www.scylladb.com/
>>>>> 2017/03/28/parallel-efficient-full-table-scan-scylla/
>>>>> The algorithm should translate to Cassandra but you'll have to use
>>>>> different rules of the thumb.
>>>>>
>>>>> Best,
>>>>> Dor
>>>>>
>>>>>
>>>>> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
>>>>> alex.kotelnikov@diginetica.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> we are trying Cassandra as an alternative for storage huge stream of
>>>>>> data coming from our customers.
>>>>>>
>>>>>> Storing works quite fine, and I started to validate how retrieval
>>>>>> does. We have two types of that: fetching specific records and bulk
>>>>>> retrieval for general analysis.
>>>>>> Fetching single record works like charm. But it is not so with bulk
>>>>>> fetch.
>>>>>>
>>>>>> With a moderately small table of ~2 million records, ~10Gb raw data I
>>>>>> observed very slow operation (using token(partition key) ranges). It takes
>>>>>> minutes to perform full retrieval. We tried a couple of configurations
>>>>>> using virtual machines, real hardware and overall looks like it is not
>>>>>> possible to all table data in a reasonable time (by reasonable I mean that
>>>>>> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
>>>>>> from one server to another and when we have 10+ cassandra servers and 10+
>>>>>> spark executors total time should be even smaller).
>>>>>>
>>>>>> I tried datastax spark connector. Also I wrote a simple test case
>>>>>> using datastax java driver and see how fetch of 10k records takes ~10s so I
>>>>>> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>>>>>>
>>>>>> May be we are totally wrong trying to use Cassandra this way?
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>>
>>>>>> *Alexander Kotelnikov*
>>>>>>
>>>>>> *Team Lead*
>>>>>>
>>>>>> DIGINETICA
>>>>>> Retail Technology Company
>>>>>>
>>>>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>>>>
>>>>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best Regards,
>>>>
>>>>
>>>> *Alexander Kotelnikov*
>>>>
>>>> *Team Lead*
>>>>
>>>> DIGINETICA
>>>> Retail Technology Company
>>>>
>>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>>
>>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Best Regards,
>>
>>
>> *Alexander Kotelnikov*
>>
>> *Team Lead*
>>
>> DIGINETICA
>> Retail Technology Company
>>
>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>
>> *www.diginetica.com <http://www.diginetica.com/>*
>>
>
>


-- 

Best Regards,


*Alexander Kotelnikov*

*Team Lead*

DIGINETICA
Retail Technology Company

m: +7.921.915.06.28

*www.diginetica.com <http://www.diginetica.com/>*

Re: Full table scan with cassandra

Posted by Jeff Jirsa <jj...@gmail.com>.
Brian Hess has perhaps the best open source code example of the right way
to do this:

https://github.com/brianmhess/cassandra-loader/blob/master/src/main/java/com/datastax/loader/CqlDelimUnload.java



On Thu, Aug 17, 2017 at 10:00 AM, Alex Kotelnikov <
alex.kotelnikov@diginetica.com> wrote:

> yup, user_id is the primary key.
>
> First of all,can you share, how to "go to a node directly"?.
>
> Also such approach will retrieve all the data RF times, coordinator should
> have enough metadata to avoid that.
>
> Should not requesting multiple coordinators provide certain concurrency?
>
> On 17 August 2017 at 19:54, Dor Laor <do...@scylladb.com> wrote:
>
>> On Thu, Aug 17, 2017 at 9:36 AM, Alex Kotelnikov <
>> alex.kotelnikov@diginetica.com> wrote:
>>
>>> Dor,
>>>
>>> I believe, I tried it in many ways and the result is quite disappointing.
>>> I've run my scans on 3 different clusters, one of which was using on VMs
>>> and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to see,
>>> how this affects the performance.
>>>
>>> I also generated the flow from spark cluster ranging from 4 to 40
>>> parallel tasks as well as just multi-threaded client.
>>>
>>> The surprise is that trivial fetch of all records using token ranges
>>> takes pretty much the same time in all setups.
>>>
>>> The only beneficial thing I've learned is that it is much more efficient
>>> to create a MATERIALIZED VIEW than to filter (even using secondary index).
>>>
>>> Say, I have a typical dataset, around 3Gb of data, 1M records. And I
>>> have a trivial scan practice:
>>>
>>> String.format("SELECT token(user_id), user_id, events FROM user_events
>>> WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND
>>> token(user_id) < %d ", end) : "")
>>>
>>
>> Is user_id the primary key? Looks like this query will just go to the
>> cluster and access a random coordinator each time.
>> C* doesn't save the subsequent token on the same node. It's hashed.
>> The idea of parallel cluster scan is to go directly to all nodes in
>> parallel and query them for the hashed keys they own.
>>
>>
>>> I split all tokens into start-end ranges (except for last range, which
>>> only has start) and query ranges in multiple threads, up to 40.
>>>
>>> Whole process takes ~40s on 3 VMs cluster  2+2+4 cores, 16Gb RAM each 1
>>> virtual disk. And it takes ~30s on real hardware clusters
>>> 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much
>>> at all. Util it is too high or too low.
>>> Size of tokens range matters, but here I see the rule "make it larger,
>>> but avoid cassandra timeouts".
>>> I also tried spark connector to validate that my test multithreaded app
>>> is not the bottleneck. It is not.
>>>
>>> I expected some kind of elasticity, I see none. Feels like I do
>>> something wrong...
>>>
>>>
>>>
>>> On 17 August 2017 at 00:19, Dor Laor <do...@scylladb.com> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> You probably didn't get the paralelism right. Serial scan has
>>>> a paralelism of one. If the paralelism isn't large enough, perf will be
>>>> slow.
>>>> If paralelism is too large, Cassandra and the disk will trash and have
>>>> too
>>>> many context switches.
>>>>
>>>> So you need to find your cluster's sweet spot. We documented the
>>>> procedure
>>>> to do it in this blog: http://www.scylladb.com/
>>>> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/
>>>> and the results are here: http://www.scylladb.com/
>>>> 2017/03/28/parallel-efficient-full-table-scan-scylla/
>>>> The algorithm should translate to Cassandra but you'll have to use
>>>> different rules of the thumb.
>>>>
>>>> Best,
>>>> Dor
>>>>
>>>>
>>>> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
>>>> alex.kotelnikov@diginetica.com> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> we are trying Cassandra as an alternative for storage huge stream of
>>>>> data coming from our customers.
>>>>>
>>>>> Storing works quite fine, and I started to validate how retrieval
>>>>> does. We have two types of that: fetching specific records and bulk
>>>>> retrieval for general analysis.
>>>>> Fetching single record works like charm. But it is not so with bulk
>>>>> fetch.
>>>>>
>>>>> With a moderately small table of ~2 million records, ~10Gb raw data I
>>>>> observed very slow operation (using token(partition key) ranges). It takes
>>>>> minutes to perform full retrieval. We tried a couple of configurations
>>>>> using virtual machines, real hardware and overall looks like it is not
>>>>> possible to all table data in a reasonable time (by reasonable I mean that
>>>>> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
>>>>> from one server to another and when we have 10+ cassandra servers and 10+
>>>>> spark executors total time should be even smaller).
>>>>>
>>>>> I tried datastax spark connector. Also I wrote a simple test case
>>>>> using datastax java driver and see how fetch of 10k records takes ~10s so I
>>>>> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>>>>>
>>>>> May be we are totally wrong trying to use Cassandra this way?
>>>>>
>>>>> --
>>>>>
>>>>> Best Regards,
>>>>>
>>>>>
>>>>> *Alexander Kotelnikov*
>>>>>
>>>>> *Team Lead*
>>>>>
>>>>> DIGINETICA
>>>>> Retail Technology Company
>>>>>
>>>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>>>
>>>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Best Regards,
>>>
>>>
>>> *Alexander Kotelnikov*
>>>
>>> *Team Lead*
>>>
>>> DIGINETICA
>>> Retail Technology Company
>>>
>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>
>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>
>>
>>
>
>
> --
>
> Best Regards,
>
>
> *Alexander Kotelnikov*
>
> *Team Lead*
>
> DIGINETICA
> Retail Technology Company
>
> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>
> *www.diginetica.com <http://www.diginetica.com/>*
>

Re: Full table scan with cassandra

Posted by Alex Kotelnikov <al...@diginetica.com>.
yup, user_id is the primary key.

First of all,can you share, how to "go to a node directly"?.

Also such approach will retrieve all the data RF times, coordinator should
have enough metadata to avoid that.

Should not requesting multiple coordinators provide certain concurrency?

On 17 August 2017 at 19:54, Dor Laor <do...@scylladb.com> wrote:

> On Thu, Aug 17, 2017 at 9:36 AM, Alex Kotelnikov <
> alex.kotelnikov@diginetica.com> wrote:
>
>> Dor,
>>
>> I believe, I tried it in many ways and the result is quite disappointing.
>> I've run my scans on 3 different clusters, one of which was using on VMs
>> and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to see,
>> how this affects the performance.
>>
>> I also generated the flow from spark cluster ranging from 4 to 40
>> parallel tasks as well as just multi-threaded client.
>>
>> The surprise is that trivial fetch of all records using token ranges
>> takes pretty much the same time in all setups.
>>
>> The only beneficial thing I've learned is that it is much more efficient
>> to create a MATERIALIZED VIEW than to filter (even using secondary index).
>>
>> Say, I have a typical dataset, around 3Gb of data, 1M records. And I have
>> a trivial scan practice:
>>
>> String.format("SELECT token(user_id), user_id, events FROM user_events
>> WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND
>> token(user_id) < %d ", end) : "")
>>
>
> Is user_id the primary key? Looks like this query will just go to the
> cluster and access a random coordinator each time.
> C* doesn't save the subsequent token on the same node. It's hashed.
> The idea of parallel cluster scan is to go directly to all nodes in
> parallel and query them for the hashed keys they own.
>
>
>> I split all tokens into start-end ranges (except for last range, which
>> only has start) and query ranges in multiple threads, up to 40.
>>
>> Whole process takes ~40s on 3 VMs cluster  2+2+4 cores, 16Gb RAM each 1
>> virtual disk. And it takes ~30s on real hardware clusters
>> 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much
>> at all. Util it is too high or too low.
>> Size of tokens range matters, but here I see the rule "make it larger,
>> but avoid cassandra timeouts".
>> I also tried spark connector to validate that my test multithreaded app
>> is not the bottleneck. It is not.
>>
>> I expected some kind of elasticity, I see none. Feels like I do something
>> wrong...
>>
>>
>>
>> On 17 August 2017 at 00:19, Dor Laor <do...@scylladb.com> wrote:
>>
>>> Hi Alex,
>>>
>>> You probably didn't get the paralelism right. Serial scan has
>>> a paralelism of one. If the paralelism isn't large enough, perf will be
>>> slow.
>>> If paralelism is too large, Cassandra and the disk will trash and have
>>> too
>>> many context switches.
>>>
>>> So you need to find your cluster's sweet spot. We documented the
>>> procedure
>>> to do it in this blog: http://www.scylladb.com/
>>> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/
>>> and the results are here: http://www.scylladb.com/
>>> 2017/03/28/parallel-efficient-full-table-scan-scylla/
>>> The algorithm should translate to Cassandra but you'll have to use
>>> different rules of the thumb.
>>>
>>> Best,
>>> Dor
>>>
>>>
>>> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
>>> alex.kotelnikov@diginetica.com> wrote:
>>>
>>>> Hey,
>>>>
>>>> we are trying Cassandra as an alternative for storage huge stream of
>>>> data coming from our customers.
>>>>
>>>> Storing works quite fine, and I started to validate how retrieval does.
>>>> We have two types of that: fetching specific records and bulk retrieval for
>>>> general analysis.
>>>> Fetching single record works like charm. But it is not so with bulk
>>>> fetch.
>>>>
>>>> With a moderately small table of ~2 million records, ~10Gb raw data I
>>>> observed very slow operation (using token(partition key) ranges). It takes
>>>> minutes to perform full retrieval. We tried a couple of configurations
>>>> using virtual machines, real hardware and overall looks like it is not
>>>> possible to all table data in a reasonable time (by reasonable I mean that
>>>> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
>>>> from one server to another and when we have 10+ cassandra servers and 10+
>>>> spark executors total time should be even smaller).
>>>>
>>>> I tried datastax spark connector. Also I wrote a simple test case using
>>>> datastax java driver and see how fetch of 10k records takes ~10s so I
>>>> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>>>>
>>>> May be we are totally wrong trying to use Cassandra this way?
>>>>
>>>> --
>>>>
>>>> Best Regards,
>>>>
>>>>
>>>> *Alexander Kotelnikov*
>>>>
>>>> *Team Lead*
>>>>
>>>> DIGINETICA
>>>> Retail Technology Company
>>>>
>>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>>
>>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Best Regards,
>>
>>
>> *Alexander Kotelnikov*
>>
>> *Team Lead*
>>
>> DIGINETICA
>> Retail Technology Company
>>
>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>
>> *www.diginetica.com <http://www.diginetica.com/>*
>>
>
>


-- 

Best Regards,


*Alexander Kotelnikov*

*Team Lead*

DIGINETICA
Retail Technology Company

m: +7.921.915.06.28

*www.diginetica.com <http://www.diginetica.com/>*

Re: Full table scan with cassandra

Posted by Dor Laor <do...@scylladb.com>.
On Thu, Aug 17, 2017 at 9:36 AM, Alex Kotelnikov <
alex.kotelnikov@diginetica.com> wrote:

> Dor,
>
> I believe, I tried it in many ways and the result is quite disappointing.
> I've run my scans on 3 different clusters, one of which was using on VMs
> and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to see,
> how this affects the performance.
>
> I also generated the flow from spark cluster ranging from 4 to 40 parallel
> tasks as well as just multi-threaded client.
>
> The surprise is that trivial fetch of all records using token ranges takes
> pretty much the same time in all setups.
>
> The only beneficial thing I've learned is that it is much more efficient
> to create a MATERIALIZED VIEW than to filter (even using secondary index).
>
> Say, I have a typical dataset, around 3Gb of data, 1M records. And I have
> a trivial scan practice:
>
> String.format("SELECT token(user_id), user_id, events FROM user_events
> WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND
> token(user_id) < %d ", end) : "")
>

Is user_id the primary key? Looks like this query will just go to the
cluster and access a random coordinator each time.
C* doesn't save the subsequent token on the same node. It's hashed.
The idea of parallel cluster scan is to go directly to all nodes in
parallel and query them for the hashed keys they own.


> I split all tokens into start-end ranges (except for last range, which
> only has start) and query ranges in multiple threads, up to 40.
>
> Whole process takes ~40s on 3 VMs cluster  2+2+4 cores, 16Gb RAM each 1
> virtual disk. And it takes ~30s on real hardware clusters
> 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much
> at all. Util it is too high or too low.
> Size of tokens range matters, but here I see the rule "make it larger, but
> avoid cassandra timeouts".
> I also tried spark connector to validate that my test multithreaded app is
> not the bottleneck. It is not.
>
> I expected some kind of elasticity, I see none. Feels like I do something
> wrong...
>
>
>
> On 17 August 2017 at 00:19, Dor Laor <do...@scylladb.com> wrote:
>
>> Hi Alex,
>>
>> You probably didn't get the paralelism right. Serial scan has
>> a paralelism of one. If the paralelism isn't large enough, perf will be
>> slow.
>> If paralelism is too large, Cassandra and the disk will trash and have too
>> many context switches.
>>
>> So you need to find your cluster's sweet spot. We documented the procedure
>> to do it in this blog: http://www.scylladb.com/
>> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/
>> and the results are here: http://www.scylladb.com/
>> 2017/03/28/parallel-efficient-full-table-scan-scylla/
>> The algorithm should translate to Cassandra but you'll have to use
>> different rules of the thumb.
>>
>> Best,
>> Dor
>>
>>
>> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
>> alex.kotelnikov@diginetica.com> wrote:
>>
>>> Hey,
>>>
>>> we are trying Cassandra as an alternative for storage huge stream of
>>> data coming from our customers.
>>>
>>> Storing works quite fine, and I started to validate how retrieval does.
>>> We have two types of that: fetching specific records and bulk retrieval for
>>> general analysis.
>>> Fetching single record works like charm. But it is not so with bulk
>>> fetch.
>>>
>>> With a moderately small table of ~2 million records, ~10Gb raw data I
>>> observed very slow operation (using token(partition key) ranges). It takes
>>> minutes to perform full retrieval. We tried a couple of configurations
>>> using virtual machines, real hardware and overall looks like it is not
>>> possible to all table data in a reasonable time (by reasonable I mean that
>>> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
>>> from one server to another and when we have 10+ cassandra servers and 10+
>>> spark executors total time should be even smaller).
>>>
>>> I tried datastax spark connector. Also I wrote a simple test case using
>>> datastax java driver and see how fetch of 10k records takes ~10s so I
>>> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>>>
>>> May be we are totally wrong trying to use Cassandra this way?
>>>
>>> --
>>>
>>> Best Regards,
>>>
>>>
>>> *Alexander Kotelnikov*
>>>
>>> *Team Lead*
>>>
>>> DIGINETICA
>>> Retail Technology Company
>>>
>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>
>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>
>>
>>
>
>
> --
>
> Best Regards,
>
>
> *Alexander Kotelnikov*
>
> *Team Lead*
>
> DIGINETICA
> Retail Technology Company
>
> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>
> *www.diginetica.com <http://www.diginetica.com/>*
>

Re: Full table scan with cassandra

Posted by Alex Kotelnikov <al...@diginetica.com>.
Dor,

I believe, I tried it in many ways and the result is quite disappointing.
I've run my scans on 3 different clusters, one of which was using on VMs
and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to see,
how this affects the performance.

I also generated the flow from spark cluster ranging from 4 to 40 parallel
tasks as well as just multi-threaded client.

The surprise is that trivial fetch of all records using token ranges takes
pretty much the same time in all setups.

The only beneficial thing I've learned is that it is much more efficient to
create a MATERIALIZED VIEW than to filter (even using secondary index).

Say, I have a typical dataset, around 3Gb of data, 1M records. And I have a
trivial scan practice:

String.format("SELECT token(user_id), user_id, events FROM user_events
WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND
token(user_id) < %d ", end) : "")

I split all tokens into start-end ranges (except for last range, which only
has start) and query ranges in multiple threads, up to 40.

Whole process takes ~40s on 3 VMs cluster  2+2+4 cores, 16Gb RAM each 1
virtual disk. And it takes ~30s on real hardware clusters
8servers*8cores*32Gb. Level of the concurrency does not matter pretty much
at all. Util it is too high or too low.
Size of tokens range matters, but here I see the rule "make it larger, but
avoid cassandra timeouts".
I also tried spark connector to validate that my test multithreaded app is
not the bottleneck. It is not.

I expected some kind of elasticity, I see none. Feels like I do something
wrong...



On 17 August 2017 at 00:19, Dor Laor <do...@scylladb.com> wrote:

> Hi Alex,
>
> You probably didn't get the paralelism right. Serial scan has
> a paralelism of one. If the paralelism isn't large enough, perf will be
> slow.
> If paralelism is too large, Cassandra and the disk will trash and have too
> many context switches.
>
> So you need to find your cluster's sweet spot. We documented the procedure
> to do it in this blog: http://www.scylladb.com/
> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/
> and the results are here: http://www.scylladb.com/
> 2017/03/28/parallel-efficient-full-table-scan-scylla/
> The algorithm should translate to Cassandra but you'll have to use
> different rules of the thumb.
>
> Best,
> Dor
>
>
> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
> alex.kotelnikov@diginetica.com> wrote:
>
>> Hey,
>>
>> we are trying Cassandra as an alternative for storage huge stream of data
>> coming from our customers.
>>
>> Storing works quite fine, and I started to validate how retrieval does.
>> We have two types of that: fetching specific records and bulk retrieval for
>> general analysis.
>> Fetching single record works like charm. But it is not so with bulk fetch.
>>
>> With a moderately small table of ~2 million records, ~10Gb raw data I
>> observed very slow operation (using token(partition key) ranges). It takes
>> minutes to perform full retrieval. We tried a couple of configurations
>> using virtual machines, real hardware and overall looks like it is not
>> possible to all table data in a reasonable time (by reasonable I mean that
>> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
>> from one server to another and when we have 10+ cassandra servers and 10+
>> spark executors total time should be even smaller).
>>
>> I tried datastax spark connector. Also I wrote a simple test case using
>> datastax java driver and see how fetch of 10k records takes ~10s so I
>> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>>
>> May be we are totally wrong trying to use Cassandra this way?
>>
>> --
>>
>> Best Regards,
>>
>>
>> *Alexander Kotelnikov*
>>
>> *Team Lead*
>>
>> DIGINETICA
>> Retail Technology Company
>>
>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>
>> *www.diginetica.com <http://www.diginetica.com/>*
>>
>
>


-- 

Best Regards,


*Alexander Kotelnikov*

*Team Lead*

DIGINETICA
Retail Technology Company

m: +7.921.915.06.28

*www.diginetica.com <http://www.diginetica.com/>*

Re: Full table scan with cassandra

Posted by Dor Laor <do...@scylladb.com>.
Hi Alex,

You probably didn't get the paralelism right. Serial scan has
a paralelism of one. If the paralelism isn't large enough, perf will be
slow.
If paralelism is too large, Cassandra and the disk will trash and have too
many context switches.

So you need to find your cluster's sweet spot. We documented the procedure
to do it in this blog:
http://www.scylladb.com/2017/02/13/efficient-full-table-scans-with-scylla-1-6/
and the results are here:
http://www.scylladb.com/2017/03/28/parallel-efficient-full-table-scan-scylla/
The algorithm should translate to Cassandra but you'll have to use
different rules of the thumb.

Best,
Dor


On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
alex.kotelnikov@diginetica.com> wrote:

> Hey,
>
> we are trying Cassandra as an alternative for storage huge stream of data
> coming from our customers.
>
> Storing works quite fine, and I started to validate how retrieval does. We
> have two types of that: fetching specific records and bulk retrieval for
> general analysis.
> Fetching single record works like charm. But it is not so with bulk fetch.
>
> With a moderately small table of ~2 million records, ~10Gb raw data I
> observed very slow operation (using token(partition key) ranges). It takes
> minutes to perform full retrieval. We tried a couple of configurations
> using virtual machines, real hardware and overall looks like it is not
> possible to all table data in a reasonable time (by reasonable I mean that
> since we have 1Gbit network 10Gb can be transferred in a couple of minutes
> from one server to another and when we have 10+ cassandra servers and 10+
> spark executors total time should be even smaller).
>
> I tried datastax spark connector. Also I wrote a simple test case using
> datastax java driver and see how fetch of 10k records takes ~10s so I
> assume that "sequential" scan will take 200x more time, equals ~30 minutes.
>
> May be we are totally wrong trying to use Cassandra this way?
>
> --
>
> Best Regards,
>
>
> *Alexander Kotelnikov*
>
> *Team Lead*
>
> DIGINETICA
> Retail Technology Company
>
> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>
> *www.diginetica.com <http://www.diginetica.com/>*
>