You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Jonathan Ellis (JIRA)" <ji...@apache.org> on 2011/07/15 05:05:00 UTC

[jira] [Created] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Allow taking advantage of multiple cores while compacting a single CF
---------------------------------------------------------------------

                 Key: CASSANDRA-2901
                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
             Project: Cassandra
          Issue Type: Improvement
          Components: Core
            Reporter: Jonathan Ellis
            Priority: Minor


Moved from CASSANDRA-1876:

There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.

So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us).

This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.

Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.)

IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Yewei Zhang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072691#comment-13072691 ] 

Yewei Zhang commented on CASSANDRA-2901:
----------------------------------------

Yes, you are right. the deserialization part should be done per sstable. In the junit test, I hit the error you just described. 

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0002-parallel-compactions.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Hudson (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13080519#comment-13080519 ] 

Hudson commented on CASSANDRA-2901:
-----------------------------------

Integrated in Cassandra #1008 (See [https://builds.apache.org/job/Cassandra/1008/])
    refactor CompactionIterator -> CompactionIterable
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1154635
Files : 
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
* /cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 2901-trunk.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 1.0
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0002-parallel-compaction.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-refactor-CompactionIterator-CompactionIterable.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13079590#comment-13079590 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

Found the bug: DeserializedColumnIterator needed to create a new iter on reset().  (So, it was indeed a problem with mixed lazy/nonlazy iteration, specifically a problem with eager deserialize that only showed up when it needed to make multiple passes b/c of the presence of a lazy iterator.)

new patches up.  Incorporated Sylvain's improvements as well, partly in 01 and partly in 02.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 2901-trunk.txt

Committed the CompactionIterable refactoring to trunk.  Attaching latest trunk version, which fixes not closing the Deserializer sources.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 2901-v3.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13066737#comment-13066737 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

That's an interesting idea, but the more I think about it the less convinced I am that it's an easy win.

First of all, the premise that compaction is GC-intensive should be qualified: it can help cause young-gen compactions, but almost none of it will ever be promoted to old gen, which is what most people worry about.  Small rows are compacted quickly enough to not be promoted, and large rows compact column-at-a-time which will also not live long enough to be promoted.  (If you are seeing "medium size" rows get tenured, then consider reduction in_memory_compaction_limit_in_mb.)

Second, it's harder than it looks to actually push compaction out to another process, because you have basically three choices:
- use Runtime.exec or ProcessBuilder
- use JNA and vfork
- run a separate, always-on "compaction daemon" and communicate with it over RMI or other IPC

The first of these is implemented using fork on Linux, which can cause spurious OOMs when running in an environment with overcommit disabled (which is generally accepted as best practice in a server environment). Overcommit aside, copying even just the page table for a largish heap is expensive: http://lwn.net/Articles/360509/ 

vfork allows avoiding copying the parent process's page table, but is obviously not completely portable so we'd have to keep in-process compaction around as a fallback option.

Neither of these makes it easy to communicate back to the parent Cassandra process what cache rows should be invalidated (CASSANDRA-2305). This may be something we can live with (we did for years), but it's a regression nevertheless.

The compaction daemon approach avoids the above problems but adds substantial complexity to implementation.

tl;dr: you're welcome to experiment with it but I don't think it's at all clear yet that the cost/benefit is there.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us).
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.)
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Sylvain Lebresne (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13073477#comment-13073477 ] 

Sylvain Lebresne commented on CASSANDRA-2901:
---------------------------------------------

Comments:
* PCI.Reducer.getCompactedRow unwraps NotifyingSSTableIterators, so their close() function won't be called (as a side note, it doesn't seem like we ever call close() on the SSTableIdentityIterator).
* The MergeTask executor has a bounded queue (and number of threads), so tasks can be rejected. If we want submitters to block when the queue is full and all threads are occupied, we need to reuse the trick of DebuggableThreadPoolExecutor.
* Deserializer uses a queue of size 1 to queue up to 1 row while it deserialize the next one. However, we already queue up rows in the MergeTask executor, so it feels like it would be simple to use direct handoff here. It would make it easier to reason about how many rows are in memory at any given time for instance.
* More generally, the memory blow up is (potentially) much more than the 2x (compared to mono-threaded) in the description of this ticket. I think that right now we may have:
  ** 1 for the row being deserialized
  ** 1 for the row in the Deserialized queue
  ** nbAvailProcessor's for the row in the MergeTask executor queue (each mergeTask can contain up to 'InMemoryCompactionLimit' worth of data)
  ** 1 for the row being merged
  Note that if we really want to get to the (roughly) 2x like in the description of this ticket, we need direct hand-off for both the Deserializer queue *and* the merge executor. I would be fine queuing a few tasks in the merge executor though if that can help with throughput, but I'm not even sure it will.
* MergeTask calls removeDeleted and removeOldShards on the compacted cf, but it is also called in the constructor of PreCompactedRow a little bit later (we should probably remove the occurrence in PreCompactedRow as it's still multi-threaded while in the MergeTask). 
* In PCI.Reducer.getCompactedRow, in the case where inMemory == false, it seems we use the SSTI even for rows that were already read by the Deserializer, we should use the row instead to avoid deserializing twice.

Nitpick:
* In the CompactionIterable (and PCI), we create one Comparator<IColumnIterator> each time instead of having a private static final one (as it is the case prior to this patch). Granted, we don't create compaction tasks quickly enough that it would really matter much, but it seems like a good habit to be nice with the GC :)
* This is due to this patch, but there is a "race" when updating the bytesRead, such that a user could get a 0 bytesRead temporarily in the middle of a big compaction (and bytesRead should probably be volatile since it won't be read for the same thread that write it).

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 2901.patch)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 2901-v2.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Hudson (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13099113#comment-13099113 ] 

Hudson commented on CASSANDRA-2901:
-----------------------------------

Integrated in Cassandra #1082 (See [https://builds.apache.org/job/Cassandra/1082/])
    parallel compaction
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1166255
Files : 
* /cassandra/trunk/CHANGES.txt
* /cassandra/trunk/NEWS.txt
* /cassandra/trunk/conf/cassandra.yaml
* /cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
* /cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
* /cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
* /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
* /cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 1.0
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 2901-trunk.txt

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 2901-v2.txt

v2 attached.

Summary:

- Extracts common code from CompactionIterator (renamed CompactionIterable) into AbstractCompactionIterable. 
- One Deserializer thread per input sstable performs read + deserialize (a row at a time).
 - The resulting ColumnFamilies are added to a queue, which is fed to the merge Reducer.
 - The merge Reducer creates MergeTasks on a thread-per-core Executor, and returns Future<ColumnFamily> objects, which are turned into PrecompactedRow objects when complete.
 - The main complication is in handling larger-than-memory rows.  When one is encountered, no further deserialization is done until that row is merged and written -- creating a pipeline stall, as it were.  Thus, this is intended to be useful with mostly-in-memory row sizes, but preserves correctness in the face of occasional exceptions.



> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Description: 
Moved from CASSANDRA-1876:

There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.

So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.

This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.

Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.

IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

  was:
Moved from CASSANDRA-1876:

There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.

So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us).

This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.

Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.)

IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.2
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0002-parallel-compaction.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072687#comment-13072687 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

bq. serialization is done in the constructor and is handled in getReduced() method. so the serialization is handled in multi threads

It looks to me like it's handled in the same threads that do the merging, so this is not going to saturate the CPU very well since the CPU intensive part (merging) has to wait for the i/o intensive part (deserializing).

It also looks like there's a correctness bug here, in that multiple executor threads can be attempting deserialize at the same time from the same scanner, which will cause problems (SSTS/SSTII are not threadsafe).

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13067752#comment-13067752 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

bq. we could partially parallelize large rows

Thinking about it a little more, CASSANDRA-1608 makes this unnecessary: sstables will be kept small enough that you'll have a max of one large row, per sstable.  (So the multiple simultaneous sstable compactions code that we already have takes care of that.)

So keeping to the plan of only parallelizing in-memory merges is fine.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.2
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 2901-0.8.txt

... and backported to 0.8.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Sylvain Lebresne (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13098843#comment-13098843 ] 

Sylvain Lebresne commented on CASSANDRA-2901:
---------------------------------------------

I did not spent too much time on ParallelCompactionIterable, but I suppose there wasn't much changes since last time. On the rest of the rebase, two comments:
* CompactionIterable.getReduced is changed so that it will return compactedRow if empty instead of null. This is buggy because we expect the null to be filtered by 'Iterators.filter(iter, Predicates.notNull())' in CompactionManager (which it won't do anymore). CompactionsPurgeTest is failing because of that.
* In CompactionIterable constructor, there is a line ending with 2 semi-colons.

Apart from that, patch lgtm from a technical standpoint. There is obviously the question of its usefulness. In particular, it's unclear it will be of any help when using leveldb compaction and the current multi-threading of compaction tasks (even with SSD that is). But the diff is not very complicated (if we exclude the new ParallelCompactionIterable class) so why not.


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 1.0
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0002-parallel-compaction.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Reviewer: slebresne
    Assignee: Jonathan Ellis

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 2901-trunk.txt

Rebased to trunk.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 1.0
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 2901-v3.txt

bq. PCI.Reducer.getCompactedRow unwraps NotifyingSSTableIterator

fixed

bq. it doesn't seem like we ever call close() on the SSTableIdentityIterator

fixed

bq. we need to reuse the trick of DebuggableThreadPoolExecutor

added ACR.close to fix

bq. we already queue up rows in the MergeTask executor, so it feels like it would be simple to use direct handoff here

Simpler, but then any deserializer that can't deserialize the next row by the time the next merge slot is open, will stall the merge.  My thinking was that allowing per-deserializer buffers will keep the pipeline full better.

bq. the memory blow up is (potentially) much more than the 2x (compared to mono-threaded) in the description of this ticket

It's not so bad, because we can assume N >= 2 sstables, and we restrict each deserializer to 1/N of in-memory limit.  So I think we come close to 2x overall.  (And if we don't, I'd rather adjust our estimate, than make it less performant/more complex. :)

bq. we should probably remove the occurrence in PreCompactedRow as it's still multi-threaded while in the MergeTask

refactored PCR constructors to do this.

bq. we create one Comparator<IColumnIterator> each time instead of having a private static final one

added RowContainer.comparator

bq. there is a "race" when updating the bytesRead

fixed, and changed bytesRead to volatile


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901-v3.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 0002-parallel-compaction.txt
                0001-refactor-CompactionIterator-CompactionIterable.txt

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-refactor-CompactionIterator-CompactionIterable.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Sylvain Lebresne (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13079497#comment-13079497 ] 

Sylvain Lebresne commented on CASSANDRA-2901:
---------------------------------------------

Nevermind, LazilyCompactedRowTest seems broken with the patch above.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt, 0003-Fix-LCR.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 0002-parallel-compactions.txt
                0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compactions.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Hudson (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13080241#comment-13080241 ] 

Hudson commented on CASSANDRA-2901:
-----------------------------------

Integrated in Cassandra-0.8 #258 (See [https://builds.apache.org/job/Cassandra-0.8/258/])
    refactorings and corner-case bug fixes:
- avoid modifying the List of rows after passing it to a LazilyCompactedRow
- account for possibility that all data compacted by LCR has expired
- clean up code duplication around shouldPurge cleanup
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1154369
Files : 
* /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
* /cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
* /cassandra/branches/cassandra-0.8/CHANGES.txt
* /cassandra/branches/cassandra-0.8/conf/cassandra.yaml
* /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
* /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
* /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-refactor-CompactionIterator-CompactionIterable.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13073682#comment-13073682 ] 

Jonathan Ellis edited comment on CASSANDRA-2901 at 8/1/11 7:54 PM:
-------------------------------------------------------------------

bq. PCI.Reducer.getCompactedRow unwraps NotifyingSSTableIterator

fixed

bq. it doesn't seem like we ever call close() on the SSTableIdentityIterator

added ACR.close to fix

bq. we need to reuse the trick of DebuggableThreadPoolExecutor

fixed

bq. we already queue up rows in the MergeTask executor, so it feels like it would be simple to use direct handoff here

Simpler, but then any deserializer that can't deserialize the next row by the time the next merge slot is open, will stall the merge.  My thinking was that allowing per-deserializer buffers will keep the pipeline full better.

bq. the memory blow up is (potentially) much more than the 2x (compared to mono-threaded) in the description of this ticket

It's not so bad, because we can assume N >= 2 sstables, and we restrict each deserializer to 1/N of in-memory limit.  So I think we come close to 2x overall.  (And if we don't, I'd rather adjust our estimate, than make it less performant/more complex. :)

bq. we should probably remove the occurrence in PreCompactedRow as it's still multi-threaded while in the MergeTask

refactored PCR constructors to do this.

bq. we create one Comparator<IColumnIterator> each time instead of having a private static final one

added RowContainer.comparator

bq. there is a "race" when updating the bytesRead

fixed, and changed bytesRead to volatile


      was (Author: jbellis):
    bq. PCI.Reducer.getCompactedRow unwraps NotifyingSSTableIterator

fixed

bq. it doesn't seem like we ever call close() on the SSTableIdentityIterator

fixed

bq. we need to reuse the trick of DebuggableThreadPoolExecutor

added ACR.close to fix

bq. we already queue up rows in the MergeTask executor, so it feels like it would be simple to use direct handoff here

Simpler, but then any deserializer that can't deserialize the next row by the time the next merge slot is open, will stall the merge.  My thinking was that allowing per-deserializer buffers will keep the pipeline full better.

bq. the memory blow up is (potentially) much more than the 2x (compared to mono-threaded) in the description of this ticket

It's not so bad, because we can assume N >= 2 sstables, and we restrict each deserializer to 1/N of in-memory limit.  So I think we come close to 2x overall.  (And if we don't, I'd rather adjust our estimate, than make it less performant/more complex. :)

bq. we should probably remove the occurrence in PreCompactedRow as it's still multi-threaded while in the MergeTask

refactored PCR constructors to do this.

bq. we create one Comparator<IColumnIterator> each time instead of having a private static final one

added RowContainer.comparator

bq. there is a "race" when updating the bytesRead

fixed, and changed bytesRead to volatile

  
> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901-v3.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 2901-trunk.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0003-Fix-LCR.patch)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Yewei Zhang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yewei Zhang updated CASSANDRA-2901:
-----------------------------------

    Attachment: 2901.patch

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Fix Version/s: 0.8.2

bq. IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions

Now that we have CASSANDRA-2879 done in trunk we *could* partially parallelize large rows -- the first pass (to determine the compacted row size) needs to be serial, but actually writing the data could be done concurrently with another writer.

Unclear how much extra work that would be...  and it does depend on the trunk work.

My gut: unless it looks super easy to do on top of 2879 (in which case we can move this whole ticket to trunk) let's stick to the original plan.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.2
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us).
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.)
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13078591#comment-13078591 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

Split out some fixes to the SSTII bytes tracker getting out of sync w/ the underlying stream, and did some cleanup to make the streamed/file versions less divergent.

Also adds parallel compaction testing to LazilyCompactedRowTest.

CliTest and DefsTest generate compaction loads (in DefsTest's case, on the Migrations CF -- haven't dug into CliTest as much) that break w/ parallel enabled, although the test doesn't actually fail (argh).

Haven't figured out what's causing that, and haven't come up with a way to reproduce in a "real" test yet.  The DefsTest does mix lazy/nonlazy iteration in the merge, which may be relevant.

bq. I'm also no proposing to complicate things.  

You're right, poor choice of words on my part.

Latest gives the merge executor a SynchronousQueue.  I think that's a better way to cut worst-case, than the Deserializer, for the reason given previously.

bq. 'if...instanceof' business is a bit error prone/ugly

Agreed. Added getColumnCount + reset to ICountableColumnIterator sub-interface.

bq. say how multithreaded_compaction is different from concurrent_compactors and that multithread_compaction is likely only useful for SSDs in cassandra.yaml

done

bq. The bytesRead "race" should also be fixed in CompactionIterable

done

bq. I would have put the code in CompactedRow.close() at the end of the LCR.write() instead of adding a new method, as it avoids forgetting calling close 

I did consider that, but it feels weird to me to have write implicitly call close.  I guess we could just change the method name? :)

bq. We can make PreCompactedRow.removeDeletedAndOldShards a public method and use it in PCI.MergeTask

done

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compactions.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Yewei Zhang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072054#comment-13072054 ] 

Yewei Zhang commented on CASSANDRA-2901:
----------------------------------------

Please help to review and see if the workflow and logic is correct. Some junit tests are broken and I am working on them. 

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Sylvain Lebresne (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13079938#comment-13079938 ] 

Sylvain Lebresne commented on CASSANDRA-2901:
---------------------------------------------

bq. Well, I changed the semantics of maxInMemorySize.

Oups, missed that, sorry. Nevermind then.

Patch lgtm, +1. I would deactivate the use of parallel compaction for the test by default before committing though, as it is not what we care the more for.

About Stu remarks, the goal is to speed up a given compaction. It is clearly for people having SSDs, so the goal is imho more about having compactions done as quickly as possible (to have the more consistent possible reads in particular) rather than the catching up of compactions being behind (which CASSANDRA-2191 solves indeed). Even without triggering major compaction, you can have a minor compaction that takes time. If you have the I/O capacity and the idle cpu, why not having those get done more quickly ?

As you mentioned, it is also useful for things like repair for which, let's be honest, the "other efforts underway to make the tasks less like a major compaction" are only idea at this point (not that they are bad idea or anything, let's not just consider them as done).

For CASSANDRA-1608, I admit it is less clear how useful exactly that will be with leveled compaction. However, it is not clear that will be useless either and again, leveled compaction is not the default compaction yet.

Anyway, I have to admit that it is not the ticket I care the more for, and it will likely not be useful for everyone. However it's done, the complexity is really all in ParallelCompactionIterable (the rest of this patch being mostly trivial refactorings), and that last one is completely optional, so I don't see a good reason not to add this.


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Hudson (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13080153#comment-13080153 ] 

Hudson commented on CASSANDRA-2901:
-----------------------------------

Integrated in Cassandra #1005 (See [https://builds.apache.org/job/Cassandra/1005/])
    fix tracker getting out of sync with underlying data source
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

jbellis : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1154274
Files : 
* /cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
* /cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
* /cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
* /cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 0002-parallel-compaction.txt
                0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13079615#comment-13079615 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

bq. each deserializer now get the full maxInMemorySize

Well, I changed the semantics of maxInMemorySize.  So the constructor used outside of tests looks like

{code}
        this(type, getScanners(sstables), controller, DatabaseDescriptor.getInMemoryCompactionLimit() / Iterables.size(sstables));
{code}

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 2901-trunk.txt

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 1.0
>
>         Attachments: 2901-0.8.txt, 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Stu Hood (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13079703#comment-13079703 ] 

Stu Hood commented on CASSANDRA-2901:
-------------------------------------

I like the addition of close() to AbstractCompactedRow, but...

At first glance, I'm not convinced the complexity of this ticket is worth the benefits:
* If compaction is falling behind during normal operation (aka, too many sstables), the multithreaded compaction from CASSANDRA-2191 should kick in appropriately, assuming you have reasonable compaction thresholds (for example, you can increase parallelism by lowering the MAX_THRESHOLD)
* For repair-related compactions, it sounds like there are a few other efforts underway to make the tasks less like a major compaction
* Major compactions would benefit from this ticket, but I think our endgame is that major compactions will be a thing of the past

Finally, with CASSANDRA-1608, there should be more and easier available options for parallelizing compaction, since there will be more, smaller files.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13080033#comment-13080033 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

bq. the complexity is really all in ParallelCompactionIterable (the rest of this patch being mostly trivial refactorings)

I'll pull the patch apart into non-pci-specific refactoring, and pci-specific (which I should have done in the first place), to make it more clear what complexity we're introducing in the rest of the system.

In the meantime I'll commit 0001 which is a bugfix not specific to PCI.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Yewei Zhang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072482#comment-13072482 ] 

Yewei Zhang commented on CASSANDRA-2901:
----------------------------------------

"ReaderThread" multithreads the merges but it looks like reading the source sstables is still single-threaded (per merge). Somehow we need to get the PrecompactedRow row.getColumnFamilyWithColumns call in its own thread. Again I like the SSTII wrapper that uses a Future to pull the data from a task on a (per-source-sstable) executor pattern here, but I'm sure there are other options. (Be careful to let LazilyCR tasks stay single-threaded, though.)

mm,looking more into the implementation, the serialization is done in the constructor and is handled in getReduced() method. so the serialization is handled in multi threads. I think it is very hard for LazilyCR to be single-threaded. To make this happen, there has to be a mechanism to tell the executor to hold on other threads and let only the LazilyCR thread do the work. Maybe I am missing something here. The approach I took is to have the maximumLimit to be (max memory)/pool size. This is not ideal either since only in the worst case senario, all threads are handling that much data. 

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Yewei Zhang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072432#comment-13072432 ] 

Yewei Zhang commented on CASSANDRA-2901:
----------------------------------------

Jonathan, 

Thanks for the comments. 

Let me look into the ReaderThread to make it multi-threaded.

I don't see the reason to have two different sentinel conditions, why not just use NO_ROW in both cases?

This is because the expected key is sorted. NO_KEY would possibly show up in the top when the queue has not yet been consumed. 



> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13080225#comment-13080225 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

Committed the first set of refactorings to 0.8 in r1154369.  Attaching the second, which is trunk-specific, and the remaining parallel compaction work.  It does look substantially self-contained.  The main ugliness is introducing ICountableColumnIterator, which feels like altogether too many column iterators (IColumnIterator and IIterableColumns being the others).

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13079217#comment-13079217 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

New patches have some cleanup + debugging but are functionally unchanged.

The source of the bug demonstrated by DefsTest is that LCR computes a different serialized size on the first time through the data, than the second.  The second agrees with what is actually written to disk.  I don't know what is causing the discrepancy yet; I would expect that calling reset() on each iterator before reducing would put everything into a known-good state but no.

Also verified that forcing parallel compaction to never use LCR makes the problem go away.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0001-refactor-CompactionIterator-CompactionIterable.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 2901-trunk.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Wojciech Meler (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13065851#comment-13065851 ] 

Wojciech Meler commented on CASSANDRA-2901:
-------------------------------------------

Maybe it would be nice to spawn separate compaction process? 
It is quite GC-intensive operation, so maybe it make sense to separate it from server?
It would also be nice to have cli tool to compact files without cassandra server for backup purpose - why not spawn such tool from server?

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us).
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.)
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment:     (was: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt)

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-refactor-CompactionIterator-CompactionIterable.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jonathan Ellis updated CASSANDRA-2901:
--------------------------------------

    Attachment: 0002-parallel-compaction.txt
                0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Sylvain Lebresne (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sylvain Lebresne updated CASSANDRA-2901:
----------------------------------------

    Attachment: 0003-Fix-LCR.patch

The DefsTest and CliTest problem is because we don't ignore purged tombstone on the first pass when computing the serializedSize. Attaching a small patch with the fix. The patch also fixes a failure with StreamingTransferTest: in SSTII, the columnPosition should be set for non file input, otherwise headerSiez() returns the wrong value and the assertion in getColumnFamilyWithColumns is triggered. This seems to fix all unit tests here.

The patch looks good, but each deserializer now get the full maxInMemorySize instead of maxInMemorySize / nb(Deserializers). Was that intended ?


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt, 0003-Fix-LCR.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072156#comment-13072156 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

Thanks, Yewei!

Comments:

- I think we can simplify the "wait for row to be merged" logic by noting that CompactionTask is itself single-threaded.  So I'd have PCI.next return an AbstractCompactedRow subclass--FutureACR?--that knows how to wait for the merge to finish.  Then we don't need any special logic in PCI itself, we can just pull rows-being-merged off in order and leave the blocking for the merge to finish, to CompactionTask.
- "ReaderThread" multithreads the merges but it looks like reading the source sstables is still single-threaded (per merge).  Somehow we need to get the PrecompactedRow row.getColumnFamilyWithColumns call in its own thread.  Again I like the SSTII wrapper that uses a Future to pull the data from a task on a (per-source-sstable) executor pattern here, but I'm sure there are other options.  (Be careful to let LazilyCR tasks stay single-threaded, though.)
- I don't see the reason to have two different sentinel conditions, why not just use NO_ROW in both cases?
- Note on style: better to name the things you run on executors "Task" (e.g. MergerTask) than "Thread" because "MergerThread" implies that it is a Thread subclass.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13079536#comment-13079536 ] 

Jonathan Ellis commented on CASSANDRA-2901:
-------------------------------------------

I added some debug logging that shows that it's actually including extra columns in the first pass.

[pass 1]
{noformat}
...
DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 16481 to serializedSize for 2ab319d0beba11e00000fe8ebeead9cb
[the next are bogus]
DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 17075 to serializedSize for 2acf0640beba11e00000fe8ebeead9cb
DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 17585 to serializedSize for 2af15b50beba11e00000fe8ebeead9cb
DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,056 LazilyCompactedRow.java (line 225) added 17596 to serializedSize for 2af8fc70beba11e00000fe8ebeead9cb
DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,057 LazilyCompactedRow.java (line 225) added 17493 to serializedSize for 2b0335a0beba11e00000fe8ebeead9cb
DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,057 LazilyCompactedRow.java (line 225) added 17493 to serializedSize for 2b200c70beba11e00000fe8ebeead9cb
{noformat}

[pass 2]
{noformat}
...
DEBUG [CompactionExecutor:1] 2011-08-04 11:52:42,088 LazilyCompactedRow.java (line 225) added 16481 to serializedSize for 2ab319d0beba11e00000fe8ebeead9cb
{noformat}

Baffling.

> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt, 0003-Fix-LCR.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Jonathan Ellis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13080225#comment-13080225 ] 

Jonathan Ellis edited comment on CASSANDRA-2901 at 8/5/11 9:03 PM:
-------------------------------------------------------------------

Committed the first set of refactorings to 0.8 in r1154369, and merged to trunk.  Attaching the second, which is trunk-specific, and the remaining parallel compaction work.  It does look substantially self-contained.  The main ugliness is introducing ICountableColumnIterator, which feels like altogether too many column iterators (IColumnIterator and IIterableColumns being the others).

      was (Author: jbellis):
    Committed the first set of refactorings to 0.8 in r1154369.  Attaching the second, which is trunk-specific, and the remaining parallel compaction work.  It does look substantially self-contained.  The main ugliness is introducing ICountableColumnIterator, which feels like altogether too many column iterators (IColumnIterator and IIterableColumns being the others).
  
> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.4
>
>         Attachments: 0001-fix-tracker-getting-out-of-sync-with-underlying-data-s.txt, 0002-parallel-compaction.txt
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CASSANDRA-2901) Allow taking advantage of multiple cores while compacting a single CF

Posted by "Sylvain Lebresne (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CASSANDRA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13076246#comment-13076246 ] 

Sylvain Lebresne commented on CASSANDRA-2901:
---------------------------------------------

bq. My thinking was that allowing per-deserializer buffers will keep the pipeline full better.

Make sense.

bq. It's not so bad, because we can assume N >= 2 sstables, and we restrict each deserializer to 1/N of in-memory limit. So I think we come close to 2x overall. (And if we don't, I'd rather adjust our estimate, than make it less performant/more complex.)

I still think that this pipeline is a bit too deep. I agree that each deserializer has 1/N in-memory limit, but that means that for a given row (a given key if you prefer), we have up to in_memory_limit worth of data in memory (since we have N sstables). And the number of such rows that can be in memory at a given time is:
  # 2 * mem_limit for each deserializer (the row in the queue and the one being deserialized)
  # mem_limit for each each MergeTask in memory. Given that the MergeTask executor has nb_processors threads and a nb_processors queue, this means up to 2 * nb_processors * mem_limit.
  # if we want to be exact, the reducer thread can also hold on MergeTask while it is blocked on submitting to the executor.
That is, we can have up to a (2 * nb_processors + 3) blowup. On a 8 or 16 cores, we're far from the 2x.

Now I understand the willingness to keep the pipeline full, and that in general we shouldn't be too close of this theoretical limit, but I do think that as is, it's too easy to OOM, or to force people to use a very low in-memory limit which would make this less useful than it should.

I'm also no proposing to complicate things. What I would do is using direct hand-off for the merge task executor and to update the maxInMemory limit we give to each deserializer. We could do only the limit update, but we would then need to put it even lower and I'm not sure it would be a good trade-off.

Other comments:
* We feed NotifyingSSTableIdentityIterator to LazilyCompactedRow. However, in LCR.getEstimatedColumnCount(), we won't report the right count because NSSTII is not an instance of SSTII. Same thing in LCR.iterator, we won't call reset correctly on the wrapped SSTII (imho we should add getColumnCount and reset to the IColumnIterator interface (or make a sub-interface with those) because that 'if...instanceof' business is a bit error prone/ugly).
* We could maybe say how multithreaded_compaction is different from concurrent_compactors and that multithread_compaction is likely only useful for SSDs in cassandra.yaml ?
* The bytesRead "race" should also be fixed in CompactionIterable and the 'let's use a static final comparator' stands there too. But maybe we should fix that elsewhere.
* I would have put the code in CompactedRow.close() at the end of the LCR.write() instead of adding a new method, as it avoids forgetting calling close and I don't see a good reason why close would need to be separated.
* We can make PreCompactedRow.removeDeletedAndOldShards a public method and use it in PCI.MergeTask.


> Allow taking advantage of multiple cores while compacting a single CF
> ---------------------------------------------------------------------
>
>                 Key: CASSANDRA-2901
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-2901
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Jonathan Ellis
>            Assignee: Jonathan Ellis
>            Priority: Minor
>             Fix For: 0.8.3
>
>         Attachments: 2901-v2.txt, 2901-v3.txt, 2901.patch
>
>
> Moved from CASSANDRA-1876:
> There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.
> So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). A thread pool (one per core?) merging corresponding rows from each input sstable. One thread doing serialize + writing the output (this has to wait for the merge threads to complete in-order, obviously). This should take us from being CPU bound on SSDs (since only one core is compacting) to being I/O bound.
> This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.  You'll also want a small queue size for the serialize-merged-rows executor.
> Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
> threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.
> IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira