You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cassandra.apache.org by Piotr Kołaczkowski <pk...@ii.pw.edu.pl> on 2011/12/07 17:56:25 UTC

Ticket CASSANDRA-3578 - Multithreaded CommitLog

Hello everyone,

As an interview task I've got to make CommitLog multithreaded. I'm new 
to Cassandra project and therefore, before I start modifying code, I 
have to make sure I understand what is going on there correctly.
Feel free to correct anything I got wrong or partially wrong.

1. The CommitLog singleton object is responsible for receiving 
RowMutation objects by its add method. The add method is thread-safe and 
is aimed to be called by many threads adding their RowMutations 
independently.

2. Each invocation of CommitLog#add  puts a new task onto the queue. 
This task is represented by LogRecordAdder callable object, which is 
responsible for actually calling the CommitLogSegment#write method for 
doing all the "hard work" of serializing the RowMutation object, 
calculating CRC and writing that to the memory mapped CommitLogSegment 
file buffer. The add method immediately returns a Future object, which 
can be waited for (if needed) - it will block until the row mutation is 
saved to the log file and (optionally) synced.

3. The queued tasks are processed one-by-one, sequentially by the 
appropriate ICommitLogExecutorService. This service also controls 
syncing the active memory mapped segments. There are two sync strategies 
available: periodic and batched. The periodic simply calls sync 
periodically by asynchronously putting appropriate sync task into the 
queue, inbetween the LogRecordAdder tasks. The LogRecordAdder tasks are 
"done" as soon as they are written to the log, so the caller *won't 
wait* for the sync. On the other hand, the batched strategy 
(BatchCommitLogExecutorService), performs the tasks in batches, each 
batch finished with an sync operation. The tasks are marked as done 
*after* the sync operation is finished. This deferred task marking  is 
achieved thanks to CheaterFutureTask class - allowing to run the task 
without immediately marking FutureTask as done. Nice. :)

4. The serialized size of the RowMutation object is calculated twice: 
once before submitting to the ExecutorService - to detect if it is not 
larger than the segment size, and then after being taken from the queue 
for execution - to check if it fits into the active CommitLogSegment, 
and if it doesn't, to activate a new CommitLogSegment. Looks to me like 
a point needing optimisation. I couldn't find any code for caching the 
serialized size to avoid doing it twice.

5. The serialization, CRC calculation and actual commit log writes are 
happening sequentially. The aim of this ticket is to make it parallel.

Questions:
1. What happens to the recovery, if the power goes off before the log 
has been synced, and it has been written partially (e.g. it is truncated 
in the middle of the RowMutation data)? Are incomplete RowMutation 
writes detected only by means of CRC (CommitLog around lines 237-240), 
or is there some other mechanism for it?

2. Is the CommitLog#add method allowed to do some heavier computations? 
What is the contract for it? Does it have to return immediately or can I 
move some code into it?

Solutions I consider (please comment):

1. Moving the serialized size calculation, serialization and CRC 
calculation totally before the executor service queue, so that these 
operations would be parallel, and performed once per RowMutation object. 
The calculated size / data array / CRC value would be appended to the 
task and put into the queue. Then copying that into the commit log would 
proceed sequentially - the task would contain only code for log writing. 
This is the safest and easiest solution, but also the least performant, 
because copying is still sequential and still might be a bottleneck. The 
logic of allocating new commit log segments and syncing remains unchanged.

2. Moving the serialized size calculation, serialization, CRC 
calculation *and commit log writing* before the executor service queue. 
This raises immediately some problems / questions:
a) The code for segment allocation needs to be changed, as it becomes 
multithreaded. It can be done using AtomicInteger.compareAndSet, so that 
each RowMutation gets its own, non-overlapping piece of commit log to 
write into.
b) What happens if there is not enough free space in the current active 
segment? Do we allow more active segments at once? Or do we restrict the 
parallelism to writing just into a single active segment (I don't like 
it, as it would be for certain less performant, because we would have to 
wait for finishing the current active segement, before we can start a 
new one)?
c) Is the recovery method ready for reading partially written (invalid) 
RowMutation, that is not the last mutation in the commit log? If we 
allow writing several row mutations parallel, it has to be.
d) The tasks are sent to the queue only for wait-for-sync functionality 
- they would not contain any code to execute, because everything would 
be already done.

3. Everything just as 2., but with an addition, that the serialization 
code writes directly into the target memory mapped buffer and not into a 
temporary byte array. This would save us copying and also put less 
strain on GC.

Sorry, for such a long e-mail and best regards,
Piotr Kolaczkowski

-- 
Piotr Kołaczkowski
Instytut Informatyki, Politechnika Warszawska
Nowowiejska 15/19, 00-665 Warszawa
e-mail: pkolaczk@ii.pw.edu.pl
www: http://home.elka.pw.edu.pl/~pkolaczk/


Re: Ticket CASSANDRA-3578 - Multithreaded CommitLog

Posted by Jonathan Ellis <jb...@gmail.com>.
2011/12/8 Piotr Kołaczkowski <pk...@ii.pw.edu.pl>:
> Can someone tell me what is the use pattern of the CommitLog#add method? I
> mean, is it possible, that a single thread calls add many times, remembers
> the returned Future objects and *then* waits on all / some of them? Or is it
> always like: add, then wait (until the Future is ready), add, wait, add,
> wait... ?

I'm not sure what you're looking at, since CommitLog#add returns void
and so does AbstractCommitLogExecutorService#add.  There is only one
caller of CommitLog#add outside of the test code.

I think you should take a look at the executor implementations and
what kinds of guarantees they're trying to provide.  That may make it
more clear what kind of approach you want to take.  (I suspect it's
going to be a lot more difficult to multithread the Batch executor,
for instance.  Does that mean we ignore it entirely and say "sorry, we
can only provide single-threaded commitlog in batch mode?"  Or take
two different approaches?  Or can we do one approach for both after
all?)

I'd also point you to RowMutation.preserializedBuffers -- when we
receive a RM from another node, we keep the byte[] we deserialized it
from, so we don't need to reserialize it for the CommitLog.  So I'd
avoid spending a ton of effort parallelizing the serialize, since in
the real world it's usually a no-op.

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of DataStax, the source for professional Cassandra support
http://www.datastax.com

Re: Ticket CASSANDRA-3578 - Multithreaded CommitLog

Posted by Piotr Kołaczkowski <pk...@ii.pw.edu.pl>.
W dniu 2011-12-08 08:40, Jonathan Ellis pisze:
> 2011/12/8 Piotr Kołaczkowski<pk...@ii.pw.edu.pl>:
>> Right, this would be the best option to have an ability to write into
>> multiple log files, put on multiple disks. I'm not sure if it is part of
>> that ticket, though.
> It's not.  I don't think anyone needs more than 80MB/s or so of
> commitlog bandwidth for a while.
>
>> BTW: I'm not so sure if multiple, parallel writes to a memory mapped file
>> would be actually slower or faster than sequential writes. I think the OS
>> would optimise the writes so that physically they would be sequential, or
>> even delay them until fsync (or low cached disk buffers), so no performance
>> loss would occur
> Right.  What we're trying to fix here is having a single thread doing
> the copying + checksumming being a bottleneck.  The i/o pattern should
> stay more or less the same.
>

Thanks for explanation. This is exactly what I understood from the 
ticket. Also calculating the serialized size twice looks like a waste of 
CPU to me (or am I wrong and it is calculated once?)
Now, the longer I think about this ticket, I've got more questions.

Can someone tell me what is the use pattern of the CommitLog#add method? 
I mean, is it possible, that a single thread calls add many times, 
remembers the returned Future objects and *then* waits on all / some of 
them? Or is it always like: add, then wait (until the Future is ready), 
add, wait, add, wait... ? If the former is true, then we would benefit 
from returning the Future objects as early as possible, without 
performing any heavy calculations in the add method, and making the code 
parallel on the output of the queue - by using some kind of a thread 
pool executor (or changing current commit log executors to have more 
than one worker thread). Then, even if a single thread writes to the 
CommitLog many RowMutations, the CRC and copying would be still parallel 
and fast. What do you think of it? Does it make sense? In the future, 
such architecture could be extended to supporting many log files on 
separate disks :)


To summarize:

The current architecture:
many threads (calc. size) ->  queue -> one thread (calc. size, 
serialize, CRC, allocate, copy, fsync)

My 1st proposal:
many threads (calc. size, serialize, CRC) -> queue -> one thread 
(allocate, copy, fsync)

My 2nd proposal:
many threads (calc. size, allocate, serialize, CRC, copy) -> queue -> 
one thread (fsync)

My 3rd proposal:
many threads (calc. size, allocate, serialize directly into buffer, CRC) 
-> queue -> one thread (fsync)

My 4th proposal:
many threads (no op) -> queue -> n threads, where n = number of cores  
(calc. size, allocate, serialize, CRC, copy) -> queue -> one thread (fsync)

Which one do you like the most?

-- 
Piotr Kołaczkowski
Instytut Informatyki, Politechnika Warszawska
Nowowiejska 15/19, 00-665 Warszawa
e-mail: pkolaczk@ii.pw.edu.pl
www: http://home.elka.pw.edu.pl/~pkolaczk/


Re: Ticket CASSANDRA-3578 - Multithreaded CommitLog

Posted by Jonathan Ellis <jb...@gmail.com>.
2011/12/8 Piotr Kołaczkowski <pk...@ii.pw.edu.pl>:
> Right, this would be the best option to have an ability to write into
> multiple log files, put on multiple disks. I'm not sure if it is part of
> that ticket, though.

It's not.  I don't think anyone needs more than 80MB/s or so of
commitlog bandwidth for a while.

> BTW: I'm not so sure if multiple, parallel writes to a memory mapped file
> would be actually slower or faster than sequential writes. I think the OS
> would optimise the writes so that physically they would be sequential, or
> even delay them until fsync (or low cached disk buffers), so no performance
> loss would occur

Right.  What we're trying to fix here is having a single thread doing
the copying + checksumming being a bottleneck.  The i/o pattern should
stay more or less the same.

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of DataStax, the source for professional Cassandra support
http://www.datastax.com

Re: Ticket CASSANDRA-3578 - Multithreaded CommitLog

Posted by Piotr Kołaczkowski <pk...@ii.pw.edu.pl>.
Right, this would be the best option to have an ability to write into 
multiple log files, put on multiple disks. I'm not sure if it is part of 
that ticket, though. Maybe we should split it into two things: parallel 
serialization / CRC and parallel writes to multiple logfiles (as another 
ticket). Looks like a major commitlog refactoring, including touching 
the logfile segment management and logfile recovery code.

BTW: I'm not so sure if multiple, parallel writes to a memory mapped 
file would be actually slower or faster than sequential writes. I think 
the OS would optimise the writes so that physically they would be 
sequential, or even delay them until fsync (or low cached disk buffers), 
so no performance loss would occur, while moving data from temporary 
array to shared buffer memory would be actually faster (and possibility 
of avoiding temporary arrays by serializing directly into the shared 
buffer at all is also  promising). I think we should benchmark / profile 
this first (I can do it) and see how it is in reality, unless someone 
has already done that. If you are interested, I can find some time today 
evening to do it.


W dniu 2011-12-07 21:57, Jeremiah Jordan pisze:
>
> Another option is to have multiple threads reading from the queue and 
> writing to their own commit log files.  If you have multiple commit 
> log directories with each having its own task writing to it, you can 
> keep the "only sequential writes" optimization.  Multiple writers to 
> one disk only makes sense if you are using a SSD for storage, other 
> wise you don't only have have sequential writes, which would slow down 
> the writing.
>
> On 12/07/2011 10:56 AM, Piotr Kołaczkowski wrote:
>>
>> Hello everyone,
>>
>> As an interview task I've got to make CommitLog multithreaded. I'm 
>> new to Cassandra project and therefore, before I start modifying 
>> code, I have to make sure I understand what is going on there correctly.
>> Feel free to correct anything I got wrong or partially wrong.
>>
>> 1. The CommitLog singleton object is responsible for receiving 
>> RowMutation objects by its add method. The add method is thread-safe 
>> and is aimed to be called by many threads adding their RowMutations 
>> independently.
>>
>> 2. Each invocation of CommitLog#add  puts a new task onto the queue. 
>> This task is represented by LogRecordAdder callable object, which is 
>> responsible for actually calling the CommitLogSegment#write method 
>> for doing all the "hard work" of serializing the RowMutation object, 
>> calculating CRC and writing that to the memory mapped 
>> CommitLogSegment file buffer. The add method immediately returns a 
>> Future object, which can be waited for (if needed) - it will block 
>> until the row mutation is saved to the log file and (optionally) synced.
>>
>> 3. The queued tasks are processed one-by-one, sequentially by the 
>> appropriate ICommitLogExecutorService. This service also controls 
>> syncing the active memory mapped segments. There are two sync 
>> strategies available: periodic and batched. The periodic simply calls 
>> sync periodically by asynchronously putting appropriate sync task 
>> into the queue, inbetween the LogRecordAdder tasks. The 
>> LogRecordAdder tasks are "done" as soon as they are written to the 
>> log, so the caller *won't wait* for the sync. On the other hand, the 
>> batched strategy (BatchCommitLogExecutorService), performs the tasks 
>> in batches, each batch finished with an sync operation. The tasks are 
>> marked as done *after* the sync operation is finished. This deferred 
>> task marking  is achieved thanks to CheaterFutureTask class - 
>> allowing to run the task without immediately marking FutureTask as 
>> done. Nice. :)
>>
>> 4. The serialized size of the RowMutation object is calculated twice: 
>> once before submitting to the ExecutorService - to detect if it is 
>> not larger than the segment size, and then after being taken from the 
>> queue for execution - to check if it fits into the active 
>> CommitLogSegment, and if it doesn't, to activate a new 
>> CommitLogSegment. Looks to me like a point needing optimisation. I 
>> couldn't find any code for caching the serialized size to avoid doing 
>> it twice.
>>
>> 5. The serialization, CRC calculation and actual commit log writes 
>> are happening sequentially. The aim of this ticket is to make it 
>> parallel.
>>
>> Questions:
>> 1. What happens to the recovery, if the power goes off before the log 
>> has been synced, and it has been written partially (e.g. it is 
>> truncated in the middle of the RowMutation data)? Are incomplete 
>> RowMutation writes detected only by means of CRC (CommitLog around 
>> lines 237-240), or is there some other mechanism for it?
>>
>> 2. Is the CommitLog#add method allowed to do some heavier 
>> computations? What is the contract for it? Does it have to return 
>> immediately or can I move some code into it?
>>
>> Solutions I consider (please comment):
>>
>> 1. Moving the serialized size calculation, serialization and CRC 
>> calculation totally before the executor service queue, so that these 
>> operations would be parallel, and performed once per RowMutation 
>> object. The calculated size / data array / CRC value would be 
>> appended to the task and put into the queue. Then copying that into 
>> the commit log would proceed sequentially - the task would contain 
>> only code for log writing. This is the safest and easiest solution, 
>> but also the least performant, because copying is still sequential 
>> and still might be a bottleneck. The logic of allocating new commit 
>> log segments and syncing remains unchanged.
>>
>> 2. Moving the serialized size calculation, serialization, CRC 
>> calculation *and commit log writing* before the executor service 
>> queue. This raises immediately some problems / questions:
>> a) The code for segment allocation needs to be changed, as it becomes 
>> multithreaded. It can be done using AtomicInteger.compareAndSet, so 
>> that each RowMutation gets its own, non-overlapping piece of commit 
>> log to write into.
>> b) What happens if there is not enough free space in the current 
>> active segment? Do we allow more active segments at once? Or do we 
>> restrict the parallelism to writing just into a single active segment 
>> (I don't like it, as it would be for certain less performant, because 
>> we would have to wait for finishing the current active segement, 
>> before we can start a new one)?
>> c) Is the recovery method ready for reading partially written 
>> (invalid) RowMutation, that is not the last mutation in the commit 
>> log? If we allow writing several row mutations parallel, it has to be.
>> d) The tasks are sent to the queue only for wait-for-sync 
>> functionality - they would not contain any code to execute, because 
>> everything would be already done.
>>
>> 3. Everything just as 2., but with an addition, that the 
>> serialization code writes directly into the target memory mapped 
>> buffer and not into a temporary byte array. This would save us 
>> copying and also put less strain on GC.
>>
>> Sorry, for such a long e-mail and best regards,
>> Piotr Kolaczkowski
>>
>


-- 
Piotr Kołaczkowski
Instytut Informatyki, Politechnika Warszawska
Nowowiejska 15/19, 00-665 Warszawa
e-mail: pkolaczk@ii.pw.edu.pl
www: http://home.elka.pw.edu.pl/~pkolaczk/


Re: Ticket CASSANDRA-3578 - Multithreaded CommitLog

Posted by Jeremiah Jordan <je...@morningstar.com>.
Another option is to have multiple threads reading from the queue and 
writing to their own commit log files.  If you have multiple commit log 
directories with each having its own task writing to it, you can keep 
the "only sequential writes" optimization.  Multiple writers to one disk 
only makes sense if you are using a SSD for storage, other wise you 
don't only have have sequential writes, which would slow down the writing.

On 12/07/2011 10:56 AM, Piotr Kołaczkowski wrote:
>
> Hello everyone,
>
> As an interview task I've got to make CommitLog multithreaded. I'm new 
> to Cassandra project and therefore, before I start modifying code, I 
> have to make sure I understand what is going on there correctly.
> Feel free to correct anything I got wrong or partially wrong.
>
> 1. The CommitLog singleton object is responsible for receiving 
> RowMutation objects by its add method. The add method is thread-safe 
> and is aimed to be called by many threads adding their RowMutations 
> independently.
>
> 2. Each invocation of CommitLog#add  puts a new task onto the queue. 
> This task is represented by LogRecordAdder callable object, which is 
> responsible for actually calling the CommitLogSegment#write method for 
> doing all the "hard work" of serializing the RowMutation object, 
> calculating CRC and writing that to the memory mapped CommitLogSegment 
> file buffer. The add method immediately returns a Future object, which 
> can be waited for (if needed) - it will block until the row mutation 
> is saved to the log file and (optionally) synced.
>
> 3. The queued tasks are processed one-by-one, sequentially by the 
> appropriate ICommitLogExecutorService. This service also controls 
> syncing the active memory mapped segments. There are two sync 
> strategies available: periodic and batched. The periodic simply calls 
> sync periodically by asynchronously putting appropriate sync task into 
> the queue, inbetween the LogRecordAdder tasks. The LogRecordAdder 
> tasks are "done" as soon as they are written to the log, so the caller 
> *won't wait* for the sync. On the other hand, the batched strategy 
> (BatchCommitLogExecutorService), performs the tasks in batches, each 
> batch finished with an sync operation. The tasks are marked as done 
> *after* the sync operation is finished. This deferred task marking  is 
> achieved thanks to CheaterFutureTask class - allowing to run the task 
> without immediately marking FutureTask as done. Nice. :)
>
> 4. The serialized size of the RowMutation object is calculated twice: 
> once before submitting to the ExecutorService - to detect if it is not 
> larger than the segment size, and then after being taken from the 
> queue for execution - to check if it fits into the active 
> CommitLogSegment, and if it doesn't, to activate a new 
> CommitLogSegment. Looks to me like a point needing optimisation. I 
> couldn't find any code for caching the serialized size to avoid doing 
> it twice.
>
> 5. The serialization, CRC calculation and actual commit log writes are 
> happening sequentially. The aim of this ticket is to make it parallel.
>
> Questions:
> 1. What happens to the recovery, if the power goes off before the log 
> has been synced, and it has been written partially (e.g. it is 
> truncated in the middle of the RowMutation data)? Are incomplete 
> RowMutation writes detected only by means of CRC (CommitLog around 
> lines 237-240), or is there some other mechanism for it?
>
> 2. Is the CommitLog#add method allowed to do some heavier 
> computations? What is the contract for it? Does it have to return 
> immediately or can I move some code into it?
>
> Solutions I consider (please comment):
>
> 1. Moving the serialized size calculation, serialization and CRC 
> calculation totally before the executor service queue, so that these 
> operations would be parallel, and performed once per RowMutation 
> object. The calculated size / data array / CRC value would be appended 
> to the task and put into the queue. Then copying that into the commit 
> log would proceed sequentially - the task would contain only code for 
> log writing. This is the safest and easiest solution, but also the 
> least performant, because copying is still sequential and still might 
> be a bottleneck. The logic of allocating new commit log segments and 
> syncing remains unchanged.
>
> 2. Moving the serialized size calculation, serialization, CRC 
> calculation *and commit log writing* before the executor service 
> queue. This raises immediately some problems / questions:
> a) The code for segment allocation needs to be changed, as it becomes 
> multithreaded. It can be done using AtomicInteger.compareAndSet, so 
> that each RowMutation gets its own, non-overlapping piece of commit 
> log to write into.
> b) What happens if there is not enough free space in the current 
> active segment? Do we allow more active segments at once? Or do we 
> restrict the parallelism to writing just into a single active segment 
> (I don't like it, as it would be for certain less performant, because 
> we would have to wait for finishing the current active segement, 
> before we can start a new one)?
> c) Is the recovery method ready for reading partially written 
> (invalid) RowMutation, that is not the last mutation in the commit 
> log? If we allow writing several row mutations parallel, it has to be.
> d) The tasks are sent to the queue only for wait-for-sync 
> functionality - they would not contain any code to execute, because 
> everything would be already done.
>
> 3. Everything just as 2., but with an addition, that the serialization 
> code writes directly into the target memory mapped buffer and not into 
> a temporary byte array. This would save us copying and also put less 
> strain on GC.
>
> Sorry, for such a long e-mail and best regards,
> Piotr Kolaczkowski
>