You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by Maxim Muzafarov <ma...@gmail.com> on 2018/12/07 12:19:25 UTC

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Igniters,


Below you can find measurements and profiling results for rebalancing
procedure with the single persisted cache (persisted on SSD drives).
I've posted other results with different hardware configuration on the
confluence page within cpu\disk\network utilization graphs. Only most
important numbers are left here.


batches: 146938
rows: 77321844
rows per batch: 526

time (total): 20 min
cache size: 78055 MB
rebalance speed: 63 MB\sec
rows per sec: 62965 rows
batch per sec: 119 batches


+ cache rebalance total: 100.00%
+ + preload on demander: 95.70%
+ + + offheap().invoke(..): 76.22%
+ + + + dataTree.invoke(..): 72.96%
+ + + + + BPlusTree.invokeDown(..): 11.98% <<<--!
+ + + + + FreeList.insertDataRow(..): 57.65% <<<--!
+ + + wal().log(..): 11.28%
+ message serialization: 0.13%
+ network delay between meesages (total): 1.23%
+ make batch on supplier handleDemandMessage(..): 19.85%


[1] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com> wrote:
>
> Maxim,
>
> Regarding MVCC - this is essentially a copy-on-write approach. New entry is
> created on every update. They are cleaned asynchronously by dedicated
> threads (aka "vacuum").
>
> I looked at the document you mentioned, thank you for pointing to it. But
> it doesn't answer all questions around existing design, and what I am
> trying to do is to get how deep do we understand current problems. It is
> very true that various subsystems, such as buffer managers, WALs,
> supporting sctructures, etc. incur very serious overhead. And when it comes
> to heavy operations implementors typically seek for a way to bypass as much
> components as possible, taking in count that different shortcuts lead to
> different types of side effects. And IMO our very important goal for now is
> to create space of possible improvements, and estimate their costs, risks
> and applicability for product's configuration space.
>
> Let me claridy several questions about current rebalance implementation, as
> I am not a big expert here.
> 1) Is it correct that supplier sends only one message for every demand
> message? If yes, then streaming should improve network utilization a lot.
> 2) Is it correct that for user caches we process supply messages in a
> system pool? Did we consider moving it to striped pool? Because if all
> operations on a single partition is ordered, we may apply a number of
> critical optimizations - bypassing page cache and checkpointer for new
> entries, batched index updates, batched free list updates, etc.
> 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> conditions when it could be disabled on supplier side?
> 4) Most important - have we tried to profile plain single-threaded
> rebalance without concurrent write load? We need to have clear
> understanding on where time is spent - supplier/demander, cpu/network/disk,
> etc. Some Java tracing code should help.
>
> And one question regarding proposed implementation - how are we going to
> handle secondary indexes?
>
> [1] https://issues.apache.org/jira/browse/IGNITE-8017
>
>
> On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:
>
> > Eduard,
> >
> > Thank you very much for the discussion!
> >
> > Your algorithm looks much better for me too and easier to implement.
> > I'll update appropriate process points on IEP page of the proposed
> > rebalance procedure.
> > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > <ed...@gmail.com> wrote:
> > >
> > > So, after some discussion, I could describe another approach on how to
> > > build consistent partition on the fly.
> > >
> > > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > > 2. After checkpoint finish, we start sending partition file (without any
> > > lock) to the receiver from 0 to fixed size.
> > > 3. Next checkpoints if they detect that they would override some pages of
> > > transferring file should write the previous state of a page to a
> > dedicated
> > > file.
> > > So, we would have a list of pages written 1 by 1, page id is written in
> > the
> > > page itself so we could determine page index. Let's name it log.
> > > 4. When transfer finished checkpointer would stop updating log-file. Now
> > we
> > > are ready to send it to the receiver.
> > > 5. On receiver side we start merging the dirty partition file with log
> > > (updating it with pages from log-file).
> > >
> > > So, an advantage of this method:
> > > - checkpoint-thread work couldn't  increase more than twice;
> > > - checkpoint-threads shouldn't wait for anything;
> > > - in best case, we receive partition without any extra effort.
> > >
> > >
> > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > eduard.shangareev@gmail.com> wrote:
> > >
> > > > Maxim,
> > > >
> > > > I have looked through your algorithm of reading partition consistently.
> > > > And I have some questions/comments.
> > > >
> > > > 1. The algorithm requires heavy synchronization between
> > checkpoint-thread
> > > > and new-approach-rebalance-threads,
> > > > because you need strong guarantees to not start writing or reading to
> > > > chunk which was updated or started reading by the counterpart.
> > > >
> > > > 2. Also, if we have started transferring this chunk in original
> > partition
> > > > couldn't be updated by checkpoint-threads. They should wait for
> > transfer
> > > > finishing.
> > > >
> > > > 3. If sending is slow and partition is updated then in worst case
> > > > checkpoint-threads would create the whole copy of the partition.
> > > >
> > > > So, what we have:
> > > > -on every page write checkpoint-thread should synchronize with
> > > > new-approach-rebalance-threads;
> > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > huge
> > > > as copying the whole partition.
> > > >
> > > >
> > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > ilya.kasnacheev@gmail.com>
> > > > wrote:
> > > >
> > > >> Hello!
> > > >>
> > > >> This proposal will also happily break my compression-with-dictionary
> > patch
> > > >> since it relies currently on only having local dictionaries.
> > > >>
> > > >> However, when you have compressed data, maybe speed boost is even
> > greater
> > > >> with your approach.
> > > >>
> > > >> Regards,
> > > >> --
> > > >> Ilya Kasnacheev
> > > >>
> > > >>
> > > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > > >>
> > > >> > Igniters,
> > > >> >
> > > >> >
> > > >> > I'd like to take the next step of increasing the Apache Ignite with
> > > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > > >> > procedure doesn't utilize the network and storage device throughout
> > to
> > > >> > its full extent even with enough meaningful values of
> > > >> > rebalanceThreadPoolSize property. As part of the previous discussion
> > > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > > >> > idea [3] of transferring cache partition files over the network.
> > > >> > From my point, the case to which this type of rebalancing procedure
> > > >> > can bring the most benefit – is adding a completely new node or set
> > of
> > > >> > new nodes to the cluster. Such a scenario implies fully relocation
> > of
> > > >> > cache partition files to the new node. To roughly estimate the
> > > >> > superiority of partition file transmitting over the network the
> > native
> > > >> > Linux scp\rsync commands can be used. My test environment showed the
> > > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > >> > single-threaded rebalance speed.
> > > >> >
> > > >> >
> > > >> > I've prepared the design document IEP-28 [4] and accumulated all the
> > > >> > process details of a new rebalance approach on that page. Below you
> > > >> > can find the most significant details of the new rebalance procedure
> > > >> > and components of the Apache Ignite which are proposed to change.
> > > >> >
> > > >> > Any feedback is very appreciated.
> > > >> >
> > > >> >
> > > >> > *PROCESS OVERVIEW*
> > > >> >
> > > >> > The whole process is described in terms of rebalancing single cache
> > > >> > group and partition files would be rebalanced one-by-one:
> > > >> >
> > > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > > >> > supplier node;
> > > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > > >> > starts the new checkpoint process;
> > > >> > 3. The supplier node creates empty the temporary cache partition
> > file
> > > >> > with .tmp postfix in the same cache persistence directory;
> > > >> > 4. The supplier node splits the whole cache partition file into
> > > >> > virtual chunks of predefined size (multiply to the PageMemory size);
> > > >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > > >> > cache partition file chunk and tries to flush dirty page to the
> > cache
> > > >> > partition file
> > > >> > 4.1.1. If rebalance chunk already transferred
> > > >> > 4.1.1.1. Flush the dirty page to the file;
> > > >> > 4.1.2. If rebalance chunk not transferred
> > > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > >> > 4.1.2.2. Flush the dirty page to the file;
> > > >> > 4.2. The node starts sending to the demander node each cache
> > partition
> > > >> > file chunk one by one using FileChannel#transferTo
> > > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > > >> > it from the temporary cache partition file;
> > > >> > 4.2.2. If the current chunk is not touched – read it from the
> > original
> > > >> > cache partition file;
> > > >> > 5. The demander node starts to listen to new pipe incoming
> > connections
> > > >> > from the supplier node on TcpCommunicationSpi;
> > > >> > 6. The demander node creates the temporary cache partition file with
> > > >> > .tmp postfix in the same cache persistence directory;
> > > >> > 7. The demander node receives each cache partition file chunk one
> > by one
> > > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> > chunk;
> > > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > > >> > partition file position;
> > > >> > 8. When the demander node receives the whole cache partition file
> > > >> > 8.1. The node initializes received .tmp file as its appropriate
> > cache
> > > >> > partition file;
> > > >> > 8.2. Thread-per-partition begins to apply for data entries from the
> > > >> > beginning of WAL-temporary storage;
> > > >> > 8.3. All async operations corresponding to this partition file still
> > > >> > write to the end of temporary WAL;
> > > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > >> > 8.4.1. Start the first checkpoint;
> > > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> > partition;
> > > >> > 8.4.3. All operations now are switched to the partition file instead
> > > >> > of writing to the temporary WAL;
> > > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > > >> > 9. The supplier node deletes the temporary cache partition file;
> > > >> >
> > > >> >
> > > >> > *COMPONENTS TO CHANGE*
> > > >> >
> > > >> > CommunicationSpi
> > > >> >
> > > >> > To benefit from zero copy we must delegate the file transferring to
> > > >> > FileChannel#transferTo(long, long,
> > > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > > >> > transferTo method is only executed if the destination buffer
> > inherits
> > > >> > from an internal JDK class.
> > > >> >
> > > >> > Preloader
> > > >> >
> > > >> > A new implementation of cache entries preloader assume to be done.
> > The
> > > >> > new implementation must send and receive cache partition files over
> > > >> > the CommunicationSpi channels by chunks of data with validation
> > > >> > received items. The new layer over the cache partition file must
> > > >> > support direct using of FileChannel#transferTo method over the
> > > >> > CommunicationSpi pipe connection. The connection bandwidth of the
> > > >> > cache partition file transfer must have the ability to be limited at
> > > >> > runtime.
> > > >> >
> > > >> > Checkpointer
> > > >> >
> > > >> > When the supplier node receives the cache partition file demand
> > > >> > request it will send the file over the CommunicationSpi. The cache
> > > >> > partition file can be concurrently updated by checkpoint thread
> > during
> > > >> > its transmission. To guarantee the file consistency Сheckpointer
> > must
> > > >> > use copy-on-write technique and save a copy of updated chunk into
> > the
> > > >> > temporary file.
> > > >> >
> > > >> > (new) Catch-up temporary WAL
> > > >> >
> > > >> > While the demander node is in the partition file transmission state
> > it
> > > >> > must save all cache entries corresponding to the moving partition
> > into
> > > >> > a new temporary WAL storage. These entries will be applied later one
> > > >> > by one on the received cache partition file. All asynchronous
> > > >> > operations will be enrolled to the end of temporary WAL storage
> > during
> > > >> > storage reads until it becomes fully read. The file-based FIFO
> > > >> > approach assumes to be used by this process.
> > > >> >
> > > >> >
> > > >> > *RECOVERY*
> > > >> >
> > > >> > In case of crash recovery, there is no additional actions need to be
> > > >> > applied to keep the cache partition file consistency. We are not
> > > >> > recovering partition with the moving state, thus the single
> > partition
> > > >> > file will be lost and only it. The uniqueness of it is guaranteed by
> > > >> > the single-file-transmission process. The cache partition file will
> > be
> > > >> > fully loaded on the next rebalance procedure.
> > > >> >
> > > >> > To provide default cluster recovery guarantee we must to:
> > > >> > 1. Start the checkpoint process when the temporary WAL storage
> > becomes
> > > >> > empty;
> > > >> > 2. Wait for the first checkpoint ends and set owning status to the
> > > >> > cache partition;
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > [1]
> > > >> >
> > > >>
> > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > >> > [2]
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > >> > [4]
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > >> >
> > > >>
> > > >
> >