You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Mikhail Tsaplin <ts...@gmail.com> on 2018/04/27 14:59:30 UTC

Adding new nodes to cluster to speedup pending compactions

Hi,
I have a five nodes C* cluster suffering from a big number of pending
compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232

Initially, it was holding one big table (table_a). With Spark, I read that
table, extended its data and stored in a second table_b. After this
copying/extending process the number of compaction tasks in the cluster has
grown up. From nodetool cfstats (see output at the bottom): table_a has 20
SSTables and table_b has 18219.

As I understand table_b has a big SSTables number because data was
transferred from one table to another within a short time and eventually
this tables will be compacted. But now I have to read whole data from this
table_b and send it to Elasticsearch. When Spark reads this table some
Cassandra nodes are dying because of OOM.

I think that when compaction will be completed - the Spark reading job will
work fine.

The question is how can I speed up compaction process, what if I will add
another two nodes to cluster - will compaction finish faster? Or data will
be copied to new nodes but compaction will continue on the original set of
SSTables?


*Nodetool cfstats output:

                Table: table_a
                SSTable count: 20
                Space used (live): 1064889308052
                Space used (total): 1064889308052
                Space used by snapshots (total): 0
                Off heap memory used (total): 1118106937
                SSTable Compression Ratio: 0.12564594959566894
                Number of keys (estimate): 56238959
                Memtable cell count: 76824
                Memtable data size: 115531402
                Memtable off heap memory used: 0
                Memtable switch count: 17
                Local read count: 0
                Local read latency: NaN ms
                Local write count: 77308
                Local write latency: 0.045 ms
                Pending flushes: 0
                Bloom filter false positives: 0
                Bloom filter false ratio: 0.00000
                Bloom filter space used: 120230328
                Bloom filter off heap memory used: 120230168
                Index summary off heap memory used: 2837249
                Compression metadata off heap memory used: 995039520
                Compacted partition minimum bytes: 1110
                Compacted partition maximum bytes: 52066354
                Compacted partition mean bytes: 133152
                Average live cells per slice (last five minutes): NaN
                Maximum live cells per slice (last five minutes): 0
                Average tombstones per slice (last five minutes): NaN
                Maximum tombstones per slice (last five minutes): 0


nodetool cfstats table_b
Keyspace: dump_es
        Read Count: 0
        Read Latency: NaN ms.
        Write Count: 0
        Write Latency: NaN ms.
        Pending Flushes: 0
                Table: table_b
                SSTable count: 18219
                Space used (live): 1316641151665
                Space used (total): 1316641151665
                Space used by snapshots (total): 0
                Off heap memory used (total): 3863604976
                SSTable Compression Ratio: 0.20387645535477916
                Number of keys (estimate): 712032622
                Memtable cell count: 0
                Memtable data size: 0
                Memtable off heap memory used: 0
                Memtable switch count: 0
                Local read count: 0
                Local read latency: NaN ms
                Local write count: 0
                Local write latency: NaN ms
                Pending flushes: 0
                Bloom filter false positives: 0
                Bloom filter false ratio: 0.00000
                Bloom filter space used: 2382971488
                Bloom filter off heap memory used: 2742320056
                Index summary off heap memory used: 371500752
                Compression metadata off heap memory used: 749784168
                Compacted partition minimum bytes: 771
                Compacted partition maximum bytes: 1629722
                Compacted partition mean bytes: 3555
                Average live cells per slice (last five minutes): 132.375
                Maximum live cells per slice (last five minutes): 149
                Average tombstones per slice (last five minutes): 1.0
                Maximum tombstones per slice (last five minutes): 1


------------------


I logged CQL requests going from Spark and checked how one such request is
performing - it fetches 8075rows, 59mb data in 155s (see below check output)

$ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc", "snapshot_doc"
FROM "dump_es"."table_b" WHERE token("scan_id") > 946122293981930504 AND
token("scan_id") <= 946132293981
930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date


Fri Apr 27 13:32:55 UTC 2018
   8076   61191 59009831
Fri Apr 27 13:35:30 UTC 2018

Re: Adding new nodes to cluster to speedup pending compactions

Posted by Mikhail Tsaplin <ts...@gmail.com>.
Thanks, everybody.
Setting compaction throughput improved compaction performance. On
overloaded cluster number of SSTables dropped from ~16K to ~7K. This way I
can wait until it will be stabilized.

PS
This task is a one time process - I am upgrading Elasticsearch from v2 to
v6 and once I have same data in Cassandra - I decided to load it from
Cassandra. Also, I needed to merge this data with other related documents -
and Cassandra static field - greatly helps by allowing storing one to many
relations in a single table - that's why I copied unjoined data from
table_a to table_b.

2018-04-28 8:03 GMT+07:00 Evelyn Smith <u5...@gmail.com>:

> Hi Mikhall,
>
> There are a few ways to speed up compactions in the short term:
> *- nodetool setcompactionthroughput 0*
> This will unthrottle compactions but obviously unthrottling compactions
> puts you at risk of high latency while compactions are running.
> *- nodetool setconcurrentcompactors 2*
> You usually want to set this to the lower of disks or cores. If you are
> using SSDs you want to use the number of cores which it looks like
> d2.xlarge have 2 virtual cores.
> *- nodetool disablebinary*
> You can use this to stop an individual node from acting as coordinator.
> This will let the node focus on catching up on compactions and you can use
> it if one or two nodes has significantly higher pending compactions then
> the rest of the cluster.
> *- nodetool disablegossip / disablethrift*
> Same logic as above except with accepting writes and you can only do it
> for ~2-2..5 hours or you risk inconsistent data by missing the hinted
> handoff period.
>
> Long term solutions:
> *- Consider switching instance type*
> The nodes you are using are storage optimised. They have very little
> processing power which is needed to process compactions. Also the AWS
> documentation seems to suggest HDD not SSD on this instance. Are you sure
> you actually have SSDs because that makes a big difference.
> *- Add nodes*
> The data will redistribute over more nodes and each node will be
> responsible for less compactions (less data ~= less compactions)
> *- If it’s a batch load make Spark do it*
> My impression is that you want to batch load from Cassandra to
> Elasticsearch after batch loading from Spark to Cassandra. If that is the
> case, why not get Spark to do the batch load if it already has the data
> (maybe I’m misinterpreting what you are doing).
> *- Consider throttling Spark when it batch loads to Cassandra*
> If Cassandra gets overwhelmed it can start acting up, keep an eye out for
> lots of undersized SSTables, it might be a sign that Cassandra is running
> out of Memory during the batch load and flushing lots of little Memtables
> to Disk as SSTables to conserve memory.
>
> Some final follow up questions:
> - What is the purpose of this cluster?
> Is it to support BAU, run daily analytics, or event an occasional one time
> cluster required to spin up for some analysis before being spun down? This
> info helps a lots in understanding where you can make concessions.
>  - What is the flow of data and what are the timing requirements?
>
> Cheers,
> Eevee.
>
>
> On 28 Apr 2018, at 3:54 am, Mikhail Tsaplin <ts...@gmail.com> wrote:
>
> The cluster has 5 nodes of d2.xlarge AWS type (32GB RAM, Attached SSD
> disks), Cassandra 3.0.9.
> Increased compaction throughput from 16 to 200 - active compaction
> remaining time decreased.
> What will happen if another node will join the cluster? - will former
> nodes move part of theirs SSTables to the new node unchanged and compaction
> time will be reduced?
>
>
>
> $ nodetool cfstats -H  dump_es
>
>
> Keyspace: table_b
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18155
>                 Space used (live): 1.2 TB
>                 Space used (total): 1.2 TB
>                 Space used by snapshots (total): 0 bytes
>                 Off heap memory used (total): 3.62 GB
>                 SSTable Compression Ratio: 0.20371982719658258
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0 bytes
>                 Memtable off heap memory used: 0 bytes
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2.22 GB
>                 Bloom filter off heap memory used: 2.56 GB
>                 Index summary off heap memory used: 357.51 MB
>                 Compression metadata off heap memory used: 724.97 MB
>                 Compacted partition minimum bytes: 771 bytes
>                 Compacted partition maximum bytes: 1.55 MB
>                 Compacted partition mean bytes: 3.47 KB
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
>
>
> 2018-04-27 22:21 GMT+07:00 Nicolas Guyomar <ni...@gmail.com>:
>
>> Hi Mikhail,
>>
>> Could you please provide :
>> - your cluster version/topology (number of nodes, cpu, ram available etc)
>> - what kind of underlying storage you are using
>> - cfstat using -H option cause I'm never sure I'm converting bytes=>GB
>>
>> You are storing 1Tb per node, so long running compaction is not really a
>> surprise, you can play with concurrent compaction thread number, compaction
>> throughput to begin with
>>
>>
>> On 27 April 2018 at 16:59, Mikhail Tsaplin <ts...@gmail.com> wrote:
>>
>>> Hi,
>>> I have a five nodes C* cluster suffering from a big number of pending
>>> compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232
>>>
>>> Initially, it was holding one big table (table_a). With Spark, I read
>>> that table, extended its data and stored in a second table_b. After this
>>> copying/extending process the number of compaction tasks in the cluster has
>>> grown up. From nodetool cfstats (see output at the bottom): table_a has 20
>>> SSTables and table_b has 18219.
>>>
>>> As I understand table_b has a big SSTables number because data was
>>> transferred from one table to another within a short time and eventually
>>> this tables will be compacted. But now I have to read whole data from this
>>> table_b and send it to Elasticsearch. When Spark reads this table some
>>> Cassandra nodes are dying because of OOM.
>>>
>>> I think that when compaction will be completed - the Spark reading job
>>> will work fine.
>>>
>>> The question is how can I speed up compaction process, what if I will
>>> add another two nodes to cluster - will compaction finish faster? Or data
>>> will be copied to new nodes but compaction will continue on the original
>>> set of SSTables?
>>>
>>>
>>> *Nodetool cfstats output:
>>>
>>>                 Table: table_a
>>>                 SSTable count: 20
>>>                 Space used (live): 1064889308052
>>>                 Space used (total): 1064889308052
>>>                 Space used by snapshots (total): 0
>>>                 Off heap memory used (total): 1118106937
>>>                 SSTable Compression Ratio: 0.12564594959566894
>>>                 Number of keys (estimate): 56238959
>>>                 Memtable cell count: 76824
>>>                 Memtable data size: 115531402
>>>                 Memtable off heap memory used: 0
>>>                 Memtable switch count: 17
>>>                 Local read count: 0
>>>                 Local read latency: NaN ms
>>>                 Local write count: 77308
>>>                 Local write latency: 0.045 ms
>>>                 Pending flushes: 0
>>>                 Bloom filter false positives: 0
>>>                 Bloom filter false ratio: 0.00000
>>>                 Bloom filter space used: 120230328
>>>                 Bloom filter off heap memory used: 120230168
>>>                 Index summary off heap memory used: 2837249
>>>                 Compression metadata off heap memory used: 995039520
>>>                 Compacted partition minimum bytes: 1110
>>>                 Compacted partition maximum bytes: 52066354
>>>                 Compacted partition mean bytes: 133152
>>>                 Average live cells per slice (last five minutes): NaN
>>>                 Maximum live cells per slice (last five minutes): 0
>>>                 Average tombstones per slice (last five minutes): NaN
>>>                 Maximum tombstones per slice (last five minutes): 0
>>>
>>>
>>> nodetool cfstats table_b
>>> Keyspace: dump_es
>>>         Read Count: 0
>>>         Read Latency: NaN ms.
>>>         Write Count: 0
>>>         Write Latency: NaN ms.
>>>         Pending Flushes: 0
>>>                 Table: table_b
>>>                 SSTable count: 18219
>>>                 Space used (live): 1316641151665
>>>                 Space used (total): 1316641151665
>>>                 Space used by snapshots (total): 0
>>>                 Off heap memory used (total): 3863604976
>>>                 SSTable Compression Ratio: 0.20387645535477916
>>>                 Number of keys (estimate): 712032622
>>>                 Memtable cell count: 0
>>>                 Memtable data size: 0
>>>                 Memtable off heap memory used: 0
>>>                 Memtable switch count: 0
>>>                 Local read count: 0
>>>                 Local read latency: NaN ms
>>>                 Local write count: 0
>>>                 Local write latency: NaN ms
>>>                 Pending flushes: 0
>>>                 Bloom filter false positives: 0
>>>                 Bloom filter false ratio: 0.00000
>>>                 Bloom filter space used: 2382971488
>>>                 Bloom filter off heap memory used: 2742320056
>>>                 Index summary off heap memory used: 371500752
>>>                 Compression metadata off heap memory used: 749784168
>>>                 Compacted partition minimum bytes: 771
>>>                 Compacted partition maximum bytes: 1629722
>>>                 Compacted partition mean bytes: 3555
>>>                 Average live cells per slice (last five minutes): 132.375
>>>                 Maximum live cells per slice (last five minutes): 149
>>>                 Average tombstones per slice (last five minutes): 1.0
>>>                 Maximum tombstones per slice (last five minutes): 1
>>>
>>>
>>> ------------------
>>>
>>>
>>> I logged CQL requests going from Spark and checked how one such request
>>> is performing - it fetches 8075rows, 59mb data in 155s (see below check
>>> output)
>>>
>>> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc",
>>> "snapshot_doc" FROM "dump_es"."table_b" WHERE token("scan_id") >
>>> 946122293981930504 AND token("scan_id") <= 946132293981
>>> 930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date
>>>
>>>
>>> Fri Apr 27 13:32:55 UTC 2018
>>>    8076   61191 59009831
>>> Fri Apr 27 13:35:30 UTC 2018
>>>
>>>
>>>
>>
>
>

Re: Adding new nodes to cluster to speedup pending compactions

Posted by Evelyn Smith <u5...@gmail.com>.
Hi Mikhall,

There are a few ways to speed up compactions in the short term:
- nodetool setcompactionthroughput 0
This will unthrottle compactions but obviously unthrottling compactions puts you at risk of high latency while compactions are running.
- nodetool setconcurrentcompactors 2
You usually want to set this to the lower of disks or cores. If you are using SSDs you want to use the number of cores which it looks like d2.xlarge have 2 virtual cores.
- nodetool disablebinary
You can use this to stop an individual node from acting as coordinator.
This will let the node focus on catching up on compactions and you can use it if one or two nodes has significantly higher pending compactions then the rest of the cluster.
- nodetool disablegossip / disablethrift
Same logic as above except with accepting writes and you can only do it for ~2-2..5 hours or you risk inconsistent data by missing the hinted handoff period.

Long term solutions:
- Consider switching instance type
The nodes you are using are storage optimised. They have very little processing power which is needed to process compactions. Also the AWS documentation seems to suggest HDD not SSD on this instance. Are you sure you actually have SSDs because that makes a big difference.
- Add nodes
The data will redistribute over more nodes and each node will be responsible for less compactions (less data ~= less compactions)
- If it’s a batch load make Spark do it
My impression is that you want to batch load from Cassandra to Elasticsearch after batch loading from Spark to Cassandra. If that is the case, why not get Spark to do the batch load if it already has the data (maybe I’m misinterpreting what you are doing).
- Consider throttling Spark when it batch loads to Cassandra
If Cassandra gets overwhelmed it can start acting up, keep an eye out for lots of undersized SSTables, it might be a sign that Cassandra is running out of Memory during the batch load and flushing lots of little Memtables to Disk as SSTables to conserve memory.

Some final follow up questions:
- What is the purpose of this cluster?
Is it to support BAU, run daily analytics, or event an occasional one time cluster required to spin up for some analysis before being spun down? This info helps a lots in understanding where you can make concessions.
 - What is the flow of data and what are the timing requirements?

Cheers,
Eevee.

> On 28 Apr 2018, at 3:54 am, Mikhail Tsaplin <ts...@gmail.com> wrote:
> 
> The cluster has 5 nodes of d2.xlarge AWS type (32GB RAM, Attached SSD disks), Cassandra 3.0.9.
> Increased compaction throughput from 16 to 200 - active compaction remaining time decreased.
> What will happen if another node will join the cluster? - will former nodes move part of theirs SSTables to the new node unchanged and compaction time will be reduced?
> 
> 
> 
> $ nodetool cfstats -H  dump_es                                                                                                                                                       
> Keyspace: table_b
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18155
>                 Space used (live): 1.2 TB
>                 Space used (total): 1.2 TB
>                 Space used by snapshots (total): 0 bytes
>                 Off heap memory used (total): 3.62 GB
>                 SSTable Compression Ratio: 0.20371982719658258
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0 bytes
>                 Memtable off heap memory used: 0 bytes
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2.22 GB
>                 Bloom filter off heap memory used: 2.56 GB
>                 Index summary off heap memory used: 357.51 MB
>                 Compression metadata off heap memory used: 724.97 MB
>                 Compacted partition minimum bytes: 771 bytes
>                 Compacted partition maximum bytes: 1.55 MB
>                 Compacted partition mean bytes: 3.47 KB
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
> 
> 
> 2018-04-27 22:21 GMT+07:00 Nicolas Guyomar <nicolas.guyomar@gmail.com <ma...@gmail.com>>:
> Hi Mikhail,
> 
> Could you please provide :
> - your cluster version/topology (number of nodes, cpu, ram available etc)
> - what kind of underlying storage you are using
> - cfstat using -H option cause I'm never sure I'm converting bytes=>GB
> 
> You are storing 1Tb per node, so long running compaction is not really a surprise, you can play with concurrent compaction thread number, compaction throughput to begin with
> 
> 
> On 27 April 2018 at 16:59, Mikhail Tsaplin <tsmisher@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> I have a five nodes C* cluster suffering from a big number of pending compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232
> 
> Initially, it was holding one big table (table_a). With Spark, I read that table, extended its data and stored in a second table_b. After this copying/extending process the number of compaction tasks in the cluster has grown up. From nodetool cfstats (see output at the bottom): table_a has 20 SSTables and table_b has 18219.
> 
> As I understand table_b has a big SSTables number because data was transferred from one table to another within a short time and eventually this tables will be compacted. But now I have to read whole data from this table_b and send it to Elasticsearch. When Spark reads this table some Cassandra nodes are dying because of OOM.
> 
> I think that when compaction will be completed - the Spark reading job will work fine.
> 
> The question is how can I speed up compaction process, what if I will add another two nodes to cluster - will compaction finish faster? Or data will be copied to new nodes but compaction will continue on the original set of SSTables?
> 
> 
> *Nodetool cfstats output:
> 
>                 Table: table_a
>                 SSTable count: 20
>                 Space used (live): 1064889308052
>                 Space used (total): 1064889308052
>                 Space used by snapshots (total): 0
>                 Off heap memory used (total): 1118106937
>                 SSTable Compression Ratio: 0.12564594959566894
>                 Number of keys (estimate): 56238959
>                 Memtable cell count: 76824
>                 Memtable data size: 115531402
>                 Memtable off heap memory used: 0
>                 Memtable switch count: 17
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 77308
>                 Local write latency: 0.045 ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 120230328
>                 Bloom filter off heap memory used: 120230168
>                 Index summary off heap memory used: 2837249
>                 Compression metadata off heap memory used: 995039520
>                 Compacted partition minimum bytes: 1110
>                 Compacted partition maximum bytes: 52066354
>                 Compacted partition mean bytes: 133152
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
> 
> 
> nodetool cfstats table_b
> Keyspace: dump_es
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18219
>                 Space used (live): 1316641151665
>                 Space used (total): 1316641151665
>                 Space used by snapshots (total): 0
>                 Off heap memory used (total): 3863604976
>                 SSTable Compression Ratio: 0.20387645535477916
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0
>                 Memtable off heap memory used: 0
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2382971488
>                 Bloom filter off heap memory used: 2742320056
>                 Index summary off heap memory used: 371500752
>                 Compression metadata off heap memory used: 749784168
>                 Compacted partition minimum bytes: 771
>                 Compacted partition maximum bytes: 1629722
>                 Compacted partition mean bytes: 3555
>                 Average live cells per slice (last five minutes): 132.375
>                 Maximum live cells per slice (last five minutes): 149
>                 Average tombstones per slice (last five minutes): 1.0
>                 Maximum tombstones per slice (last five minutes): 1
> 
> 
> ------------------
> 
> 
> I logged CQL requests going from Spark and checked how one such request is performing - it fetches 8075rows, 59mb data in 155s (see below check output)
> 
> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc", "snapshot_doc" FROM "dump_es"."table_b" WHERE token("scan_id") > 946122293981930504 AND token("scan_id") <= 946132293981
> 930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date                                                                                                                                       
> Fri Apr 27 13:32:55 UTC 2018
>    8076   61191 59009831
> Fri Apr 27 13:35:30 UTC 2018
> 
> 
> 
> 


Re: Adding new nodes to cluster to speedup pending compactions

Posted by Jonathan Haddad <jo...@jonhaddad.com>.
Your compaction time won't improve immediately simply by adding nodes
because the old data still needs to be cleaned up.

What's your end goal?  Why is having a spike in pending compaction tasks
following a massive write an issue?  Are you seeing a dip in performance,
violating an SLA, or do you just not like it?


On Fri, Apr 27, 2018 at 10:54 AM Mikhail Tsaplin <ts...@gmail.com> wrote:

> The cluster has 5 nodes of d2.xlarge AWS type (32GB RAM, Attached SSD
> disks), Cassandra 3.0.9.
> Increased compaction throughput from 16 to 200 - active compaction
> remaining time decreased.
> What will happen if another node will join the cluster? - will former
> nodes move part of theirs SSTables to the new node unchanged and compaction
> time will be reduced?
>
>
>
> $ nodetool cfstats -H  dump_es
>
>
> Keyspace: table_b
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18155
>                 Space used (live): 1.2 TB
>                 Space used (total): 1.2 TB
>                 Space used by snapshots (total): 0 bytes
>                 Off heap memory used (total): 3.62 GB
>                 SSTable Compression Ratio: 0.20371982719658258
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0 bytes
>                 Memtable off heap memory used: 0 bytes
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2.22 GB
>                 Bloom filter off heap memory used: 2.56 GB
>                 Index summary off heap memory used: 357.51 MB
>                 Compression metadata off heap memory used: 724.97 MB
>                 Compacted partition minimum bytes: 771 bytes
>                 Compacted partition maximum bytes: 1.55 MB
>                 Compacted partition mean bytes: 3.47 KB
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
>
> 2018-04-27 22:21 GMT+07:00 Nicolas Guyomar <ni...@gmail.com>:
>
>> Hi Mikhail,
>>
>> Could you please provide :
>> - your cluster version/topology (number of nodes, cpu, ram available etc)
>> - what kind of underlying storage you are using
>> - cfstat using -H option cause I'm never sure I'm converting bytes=>GB
>>
>> You are storing 1Tb per node, so long running compaction is not really a
>> surprise, you can play with concurrent compaction thread number, compaction
>> throughput to begin with
>>
>>
>> On 27 April 2018 at 16:59, Mikhail Tsaplin <ts...@gmail.com> wrote:
>>
>>> Hi,
>>> I have a five nodes C* cluster suffering from a big number of pending
>>> compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232
>>>
>>> Initially, it was holding one big table (table_a). With Spark, I read
>>> that table, extended its data and stored in a second table_b. After this
>>> copying/extending process the number of compaction tasks in the cluster has
>>> grown up. From nodetool cfstats (see output at the bottom): table_a has 20
>>> SSTables and table_b has 18219.
>>>
>>> As I understand table_b has a big SSTables number because data was
>>> transferred from one table to another within a short time and eventually
>>> this tables will be compacted. But now I have to read whole data from this
>>> table_b and send it to Elasticsearch. When Spark reads this table some
>>> Cassandra nodes are dying because of OOM.
>>>
>>> I think that when compaction will be completed - the Spark reading job
>>> will work fine.
>>>
>>> The question is how can I speed up compaction process, what if I will
>>> add another two nodes to cluster - will compaction finish faster? Or data
>>> will be copied to new nodes but compaction will continue on the original
>>> set of SSTables?
>>>
>>>
>>> *Nodetool cfstats output:
>>>
>>>                 Table: table_a
>>>                 SSTable count: 20
>>>                 Space used (live): 1064889308052
>>>                 Space used (total): 1064889308052
>>>                 Space used by snapshots (total): 0
>>>                 Off heap memory used (total): 1118106937
>>>                 SSTable Compression Ratio: 0.12564594959566894
>>>                 Number of keys (estimate): 56238959
>>>                 Memtable cell count: 76824
>>>                 Memtable data size: 115531402
>>>                 Memtable off heap memory used: 0
>>>                 Memtable switch count: 17
>>>                 Local read count: 0
>>>                 Local read latency: NaN ms
>>>                 Local write count: 77308
>>>                 Local write latency: 0.045 ms
>>>                 Pending flushes: 0
>>>                 Bloom filter false positives: 0
>>>                 Bloom filter false ratio: 0.00000
>>>                 Bloom filter space used: 120230328
>>>                 Bloom filter off heap memory used: 120230168
>>>                 Index summary off heap memory used: 2837249
>>>                 Compression metadata off heap memory used: 995039520
>>>                 Compacted partition minimum bytes: 1110
>>>                 Compacted partition maximum bytes: 52066354
>>>                 Compacted partition mean bytes: 133152
>>>                 Average live cells per slice (last five minutes): NaN
>>>                 Maximum live cells per slice (last five minutes): 0
>>>                 Average tombstones per slice (last five minutes): NaN
>>>                 Maximum tombstones per slice (last five minutes): 0
>>>
>>>
>>> nodetool cfstats table_b
>>> Keyspace: dump_es
>>>         Read Count: 0
>>>         Read Latency: NaN ms.
>>>         Write Count: 0
>>>         Write Latency: NaN ms.
>>>         Pending Flushes: 0
>>>                 Table: table_b
>>>                 SSTable count: 18219
>>>                 Space used (live): 1316641151665
>>>                 Space used (total): 1316641151665
>>>                 Space used by snapshots (total): 0
>>>                 Off heap memory used (total): 3863604976
>>> <(386)%20360-4976>
>>>                 SSTable Compression Ratio: 0.20387645535477916
>>>                 Number of keys (estimate): 712032622
>>>                 Memtable cell count: 0
>>>                 Memtable data size: 0
>>>                 Memtable off heap memory used: 0
>>>                 Memtable switch count: 0
>>>                 Local read count: 0
>>>                 Local read latency: NaN ms
>>>                 Local write count: 0
>>>                 Local write latency: NaN ms
>>>                 Pending flushes: 0
>>>                 Bloom filter false positives: 0
>>>                 Bloom filter false ratio: 0.00000
>>>                 Bloom filter space used: 2382971488
>>>                 Bloom filter off heap memory used: 2742320056
>>>                 Index summary off heap memory used: 371500752
>>>                 Compression metadata off heap memory used: 749784168
>>>                 Compacted partition minimum bytes: 771
>>>                 Compacted partition maximum bytes: 1629722
>>>                 Compacted partition mean bytes: 3555
>>>                 Average live cells per slice (last five minutes): 132.375
>>>                 Maximum live cells per slice (last five minutes): 149
>>>                 Average tombstones per slice (last five minutes): 1.0
>>>                 Maximum tombstones per slice (last five minutes): 1
>>>
>>>
>>> ------------------
>>>
>>>
>>> I logged CQL requests going from Spark and checked how one such request
>>> is performing - it fetches 8075rows, 59mb data in 155s (see below check
>>> output)
>>>
>>> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc",
>>> "snapshot_doc" FROM "dump_es"."table_b" WHERE token("scan_id") >
>>> 946122293981930504 AND token("scan_id") <= 946132293981
>>> 930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date
>>>
>>>
>>> Fri Apr 27 13:32:55 UTC 2018
>>>    8076   61191 59009831
>>> Fri Apr 27 13:35:30 UTC 2018
>>>
>>>
>>>
>>
>

Re: Adding new nodes to cluster to speedup pending compactions

Posted by Mikhail Tsaplin <ts...@gmail.com>.
The cluster has 5 nodes of d2.xlarge AWS type (32GB RAM, Attached SSD
disks), Cassandra 3.0.9.
Increased compaction throughput from 16 to 200 - active compaction
remaining time decreased.
What will happen if another node will join the cluster? - will former nodes
move part of theirs SSTables to the new node unchanged and compaction time
will be reduced?



$ nodetool cfstats -H  dump_es


Keyspace: table_b
        Read Count: 0
        Read Latency: NaN ms.
        Write Count: 0
        Write Latency: NaN ms.
        Pending Flushes: 0
                Table: table_b
                SSTable count: 18155
                Space used (live): 1.2 TB
                Space used (total): 1.2 TB
                Space used by snapshots (total): 0 bytes
                Off heap memory used (total): 3.62 GB
                SSTable Compression Ratio: 0.20371982719658258
                Number of keys (estimate): 712032622
                Memtable cell count: 0
                Memtable data size: 0 bytes
                Memtable off heap memory used: 0 bytes
                Memtable switch count: 0
                Local read count: 0
                Local read latency: NaN ms
                Local write count: 0
                Local write latency: NaN ms
                Pending flushes: 0
                Bloom filter false positives: 0
                Bloom filter false ratio: 0.00000
                Bloom filter space used: 2.22 GB
                Bloom filter off heap memory used: 2.56 GB
                Index summary off heap memory used: 357.51 MB
                Compression metadata off heap memory used: 724.97 MB
                Compacted partition minimum bytes: 771 bytes
                Compacted partition maximum bytes: 1.55 MB
                Compacted partition mean bytes: 3.47 KB
                Average live cells per slice (last five minutes): NaN
                Maximum live cells per slice (last five minutes): 0
                Average tombstones per slice (last five minutes): NaN
                Maximum tombstones per slice (last five minutes): 0


2018-04-27 22:21 GMT+07:00 Nicolas Guyomar <ni...@gmail.com>:

> Hi Mikhail,
>
> Could you please provide :
> - your cluster version/topology (number of nodes, cpu, ram available etc)
> - what kind of underlying storage you are using
> - cfstat using -H option cause I'm never sure I'm converting bytes=>GB
>
> You are storing 1Tb per node, so long running compaction is not really a
> surprise, you can play with concurrent compaction thread number, compaction
> throughput to begin with
>
>
> On 27 April 2018 at 16:59, Mikhail Tsaplin <ts...@gmail.com> wrote:
>
>> Hi,
>> I have a five nodes C* cluster suffering from a big number of pending
>> compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232
>>
>> Initially, it was holding one big table (table_a). With Spark, I read
>> that table, extended its data and stored in a second table_b. After this
>> copying/extending process the number of compaction tasks in the cluster has
>> grown up. From nodetool cfstats (see output at the bottom): table_a has 20
>> SSTables and table_b has 18219.
>>
>> As I understand table_b has a big SSTables number because data was
>> transferred from one table to another within a short time and eventually
>> this tables will be compacted. But now I have to read whole data from this
>> table_b and send it to Elasticsearch. When Spark reads this table some
>> Cassandra nodes are dying because of OOM.
>>
>> I think that when compaction will be completed - the Spark reading job
>> will work fine.
>>
>> The question is how can I speed up compaction process, what if I will add
>> another two nodes to cluster - will compaction finish faster? Or data will
>> be copied to new nodes but compaction will continue on the original set of
>> SSTables?
>>
>>
>> *Nodetool cfstats output:
>>
>>                 Table: table_a
>>                 SSTable count: 20
>>                 Space used (live): 1064889308052
>>                 Space used (total): 1064889308052
>>                 Space used by snapshots (total): 0
>>                 Off heap memory used (total): 1118106937
>>                 SSTable Compression Ratio: 0.12564594959566894
>>                 Number of keys (estimate): 56238959
>>                 Memtable cell count: 76824
>>                 Memtable data size: 115531402
>>                 Memtable off heap memory used: 0
>>                 Memtable switch count: 17
>>                 Local read count: 0
>>                 Local read latency: NaN ms
>>                 Local write count: 77308
>>                 Local write latency: 0.045 ms
>>                 Pending flushes: 0
>>                 Bloom filter false positives: 0
>>                 Bloom filter false ratio: 0.00000
>>                 Bloom filter space used: 120230328
>>                 Bloom filter off heap memory used: 120230168
>>                 Index summary off heap memory used: 2837249
>>                 Compression metadata off heap memory used: 995039520
>>                 Compacted partition minimum bytes: 1110
>>                 Compacted partition maximum bytes: 52066354
>>                 Compacted partition mean bytes: 133152
>>                 Average live cells per slice (last five minutes): NaN
>>                 Maximum live cells per slice (last five minutes): 0
>>                 Average tombstones per slice (last five minutes): NaN
>>                 Maximum tombstones per slice (last five minutes): 0
>>
>>
>> nodetool cfstats table_b
>> Keyspace: dump_es
>>         Read Count: 0
>>         Read Latency: NaN ms.
>>         Write Count: 0
>>         Write Latency: NaN ms.
>>         Pending Flushes: 0
>>                 Table: table_b
>>                 SSTable count: 18219
>>                 Space used (live): 1316641151665
>>                 Space used (total): 1316641151665
>>                 Space used by snapshots (total): 0
>>                 Off heap memory used (total): 3863604976
>>                 SSTable Compression Ratio: 0.20387645535477916
>>                 Number of keys (estimate): 712032622
>>                 Memtable cell count: 0
>>                 Memtable data size: 0
>>                 Memtable off heap memory used: 0
>>                 Memtable switch count: 0
>>                 Local read count: 0
>>                 Local read latency: NaN ms
>>                 Local write count: 0
>>                 Local write latency: NaN ms
>>                 Pending flushes: 0
>>                 Bloom filter false positives: 0
>>                 Bloom filter false ratio: 0.00000
>>                 Bloom filter space used: 2382971488
>>                 Bloom filter off heap memory used: 2742320056
>>                 Index summary off heap memory used: 371500752
>>                 Compression metadata off heap memory used: 749784168
>>                 Compacted partition minimum bytes: 771
>>                 Compacted partition maximum bytes: 1629722
>>                 Compacted partition mean bytes: 3555
>>                 Average live cells per slice (last five minutes): 132.375
>>                 Maximum live cells per slice (last five minutes): 149
>>                 Average tombstones per slice (last five minutes): 1.0
>>                 Maximum tombstones per slice (last five minutes): 1
>>
>>
>> ------------------
>>
>>
>> I logged CQL requests going from Spark and checked how one such request
>> is performing - it fetches 8075rows, 59mb data in 155s (see below check
>> output)
>>
>> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc", "snapshot_doc"
>> FROM "dump_es"."table_b" WHERE token("scan_id") > 946122293981930504 AND
>> token("scan_id") <= 946132293981
>> 930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date
>>
>>
>> Fri Apr 27 13:32:55 UTC 2018
>>    8076   61191 59009831
>> Fri Apr 27 13:35:30 UTC 2018
>>
>>
>>
>

Re: Adding new nodes to cluster to speedup pending compactions

Posted by Nicolas Guyomar <ni...@gmail.com>.
Hi Mikhail,

Could you please provide :
- your cluster version/topology (number of nodes, cpu, ram available etc)
- what kind of underlying storage you are using
- cfstat using -H option cause I'm never sure I'm converting bytes=>GB

You are storing 1Tb per node, so long running compaction is not really a
surprise, you can play with concurrent compaction thread number, compaction
throughput to begin with


On 27 April 2018 at 16:59, Mikhail Tsaplin <ts...@gmail.com> wrote:

> Hi,
> I have a five nodes C* cluster suffering from a big number of pending
> compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232
>
> Initially, it was holding one big table (table_a). With Spark, I read that
> table, extended its data and stored in a second table_b. After this
> copying/extending process the number of compaction tasks in the cluster has
> grown up. From nodetool cfstats (see output at the bottom): table_a has 20
> SSTables and table_b has 18219.
>
> As I understand table_b has a big SSTables number because data was
> transferred from one table to another within a short time and eventually
> this tables will be compacted. But now I have to read whole data from this
> table_b and send it to Elasticsearch. When Spark reads this table some
> Cassandra nodes are dying because of OOM.
>
> I think that when compaction will be completed - the Spark reading job
> will work fine.
>
> The question is how can I speed up compaction process, what if I will add
> another two nodes to cluster - will compaction finish faster? Or data will
> be copied to new nodes but compaction will continue on the original set of
> SSTables?
>
>
> *Nodetool cfstats output:
>
>                 Table: table_a
>                 SSTable count: 20
>                 Space used (live): 1064889308052
>                 Space used (total): 1064889308052
>                 Space used by snapshots (total): 0
>                 Off heap memory used (total): 1118106937
>                 SSTable Compression Ratio: 0.12564594959566894
>                 Number of keys (estimate): 56238959
>                 Memtable cell count: 76824
>                 Memtable data size: 115531402
>                 Memtable off heap memory used: 0
>                 Memtable switch count: 17
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 77308
>                 Local write latency: 0.045 ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 120230328
>                 Bloom filter off heap memory used: 120230168
>                 Index summary off heap memory used: 2837249
>                 Compression metadata off heap memory used: 995039520
>                 Compacted partition minimum bytes: 1110
>                 Compacted partition maximum bytes: 52066354
>                 Compacted partition mean bytes: 133152
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
>
>
> nodetool cfstats table_b
> Keyspace: dump_es
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18219
>                 Space used (live): 1316641151665
>                 Space used (total): 1316641151665
>                 Space used by snapshots (total): 0
>                 Off heap memory used (total): 3863604976
>                 SSTable Compression Ratio: 0.20387645535477916
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0
>                 Memtable off heap memory used: 0
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2382971488
>                 Bloom filter off heap memory used: 2742320056
>                 Index summary off heap memory used: 371500752
>                 Compression metadata off heap memory used: 749784168
>                 Compacted partition minimum bytes: 771
>                 Compacted partition maximum bytes: 1629722
>                 Compacted partition mean bytes: 3555
>                 Average live cells per slice (last five minutes): 132.375
>                 Maximum live cells per slice (last five minutes): 149
>                 Average tombstones per slice (last five minutes): 1.0
>                 Maximum tombstones per slice (last five minutes): 1
>
>
> ------------------
>
>
> I logged CQL requests going from Spark and checked how one such request is
> performing - it fetches 8075rows, 59mb data in 155s (see below check output)
>
> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc", "snapshot_doc"
> FROM "dump_es"."table_b" WHERE token("scan_id") > 946122293981930504 AND
> token("scan_id") <= 946132293981
> 930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date
>
>
> Fri Apr 27 13:32:55 UTC 2018
>    8076   61191 59009831
> Fri Apr 27 13:35:30 UTC 2018
>
>
>