You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Stefania (JIRA)" <ji...@apache.org> on 2016/07/30 02:46:21 UTC

[jira] [Commented] (CASSANDRA-9259) Bulk Reading from Cassandra

    [ https://issues.apache.org/jira/browse/CASSANDRA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15400394#comment-15400394 ] 

Stefania commented on CASSANDRA-9259:
-------------------------------------

Now that CASSANDRA-11521 is ready for review, I've repeated the Spark benchmark defined by CASSANDRA-11542 using schema 1:

{code}
CREATE TABLE ks.schema1 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 INT, PRIMARY KEY (id, timestamp))
{code}

and schema 3:

{code}
CREATE TABLE ks.schema3 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY (id, timestamp))
{code}

The benchmark measures how many seconds it takes to count rows and to find the maximum of two columns for each row, where rows are retrieved either via Spark RDDs or Data Frames (DFs). The most significant difference between RDD and DF tests is that in the DF tests only the two columns of interest to the Spark query are retrieved, whilst in the RDD tests the entire data set is retrieved. The data is either stored in Cassandra or in HDFS using CSV or Parquet files.

More details on the benchmark are available [here|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15249213&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15249213] and the code is available [here|https://github.com/stef1927/spark-load-perf].

Here is the comparison with the results of the benchmark that was run on 6th May with 15M rows, as described in [this comment|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15273884&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15273884]. We can see that the final results are consistent with the proof of concept, which was presented at the Cassandra NGCC conference.

!before_after.jpg!

* C* old: is TRUNK with no optimizations (at c662d876b95d67a911dfe549d8a0d38ee6fbb904), and the Spark Connector without SPARK-C383
* C* POC: is the [proof-of-concept patch|https://github.com/stef1927/cassandra/commits/9259], and the Spark Connector with an [earlier version|https://github.com/stef1927/spark-cassandra-connector/commits/9259] of SPARK-C383 
* C* async: is the CASSANDRA-11521 patch, with results delivered to the client via the new asynchronous paging mechanism
* C* sync: is the CASSANDRA-11521 patch, with results delivered to the client via the existing synchronous paging mechanism

Here are the results run over several incremental data sets at 15M, 30M, 60M and 120M rows with 256 VNODES:

!256_vnodes.jpg!

Below are the results run over several incremental data sets at 1 15M, 30M, 60M and 120M rows without VNODES:

!no_vnodes.jpg!


The raw data is attached [^spark_benchmark_raw_data.zip].

h5. Conclusions

* Overall the duration of the 15M row test was improved by 65% (from about 40 to 14 seconds) for schema 1 and by 56% (from 23 to 10 seconds) for schema 3.

* The new asynchronous paging mechanism significantly outperforms the existing mechanism with large data sets. For example, for schema 1 and 120M rows, it is approximately 30% faster. In order to achieve this, it is however necessary that the driver reserves a connection per asynchronous paging request, sharing connections degrades performance significantly and makes it no better than the existing mechanism.

* CSV still outperforms C* for schema 1 RDD tests. However, for DF tests and schema 3 RDD tests, C* is now on par or faster than CSV. This indicates that the number of cells in CQL rows continues to impact performance.

* Parquet is in a league of its own due to its efficient columnar format. It should however be noted that it may be storing the row count in metadata. A more meaningful benchmark could have been obtained had we excluded the row count from the time measurements.

> Bulk Reading from Cassandra
> ---------------------------
>
>                 Key: CASSANDRA-9259
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Compaction, CQL, Local Write-Read Paths, Streaming and Messaging, Testing
>            Reporter:  Brian Hess
>            Assignee: Stefania
>            Priority: Critical
>             Fix For: 3.x
>
>         Attachments: 256_vnodes.jpg, before_after.jpg, bulk-read-benchmark.1.html, bulk-read-jfr-profiles.1.tar.gz, bulk-read-jfr-profiles.2.tar.gz, no_vnodes.jpg, spark_benchmark_raw_data.zip
>
>
> This ticket is following on from the 2015 NGCC.  This ticket is designed to be a place for discussing and designing an approach to bulk reading.
> The goal is to have a bulk reading path for Cassandra.  That is, a path optimized to grab a large portion of the data for a table (potentially all of it).  This is a core element in the Spark integration with Cassandra, and the speed at which Cassandra can deliver bulk data to Spark is limiting the performance of Spark-plus-Cassandra operations.  This is especially of importance as Cassandra will (likely) leverage Spark for internal operations (for example CASSANDRA-8234).
> The core CQL to consider is the following:
> SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND Token(partitionKey) <= Y
> Here, we choose X and Y to be contained within one token range (perhaps considering the primary range of a node without vnodes, for example).  This query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk operations via Spark (or other processing frameworks - ETL, etc).  There are a few causes (e.g., inefficient paging).
> There are a few approaches that could be considered.  First, we consider a new "Streaming Compaction" approach.  The key observation here is that a bulk read from Cassandra is a lot like a major compaction, though instead of outputting a new SSTable we would output CQL rows to a stream/socket/etc.  This would be similar to a CompactionTask, but would strip out some unnecessary things in there (e.g., some of the indexing, etc). Predicates and projections could also be encapsulated in this new "StreamingCompactionTask", for example.
> Another approach would be an alternate storage format.  For example, we might employ Parquet (just as an example) to store the same data as in the primary Cassandra storage (aka SSTables).  This is akin to Global Indexes (an alternate storage of the same data optimized for a particular query).  Then, Cassandra can choose to leverage this alternate storage for particular CQL queries (e.g., range scans).
> These are just 2 suggestions to get the conversation going.
> One thing to note is that it will be useful to have this storage segregated by token range so that when you extract via these mechanisms you do not get replications-factor numbers of copies of the data.  That will certainly be an issue for some Spark operations (e.g., counting).  Thus, we will want per-token-range storage (even for single disks), so this will likely leverage CASSANDRA-6696 (though, we'll want to also consider the single disk case).
> It is also worth discussing what the success criteria is here.  It is unlikely to be as fast as EDW or HDFS performance (though, that is still a good goal), but being within some percentage of that performance should be set as success.  For example, 2x as long as doing bulk operations on HDFS with similar node count/size/etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)