You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by Maxim Muzafarov <ma...@gmail.com> on 2018/11/23 10:07:52 UTC

[DISCUSSION] Design document. Rebalance caches by transferring partition files

Igniters,


I'd like to take the next step of increasing the Apache Ignite with
enabled persistence rebalance speed. Currently, the rebalancing
procedure doesn't utilize the network and storage device throughout to
its full extent even with enough meaningful values of
rebalanceThreadPoolSize property. As part of the previous discussion
`How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
idea [3] of transferring cache partition files over the network.
From my point, the case to which this type of rebalancing procedure
can bring the most benefit – is adding a completely new node or set of
new nodes to the cluster. Such a scenario implies fully relocation of
cache partition files to the new node. To roughly estimate the
superiority of partition file transmitting over the network the native
Linux scp\rsync commands can be used. My test environment showed the
result of the new approach as 270 MB/s vs the current 40 MB/s
single-threaded rebalance speed.


I've prepared the design document IEP-28 [4] and accumulated all the
process details of a new rebalance approach on that page. Below you
can find the most significant details of the new rebalance procedure
and components of the Apache Ignite which are proposed to change.

Any feedback is very appreciated.


*PROCESS OVERVIEW*

The whole process is described in terms of rebalancing single cache
group and partition files would be rebalanced one-by-one:

1. The demander node sends the GridDhtPartitionDemandMessage to the
supplier node;
2. When the supplier node receives GridDhtPartitionDemandMessage and
starts the new checkpoint process;
3. The supplier node creates empty the temporary cache partition file
with .tmp postfix in the same cache persistence directory;
4. The supplier node splits the whole cache partition file into
virtual chunks of predefined size (multiply to the PageMemory size);
4.1. If the concurrent checkpoint thread determines the appropriate
cache partition file chunk and tries to flush dirty page to the cache
partition file
4.1.1. If rebalance chunk already transferred
4.1.1.1. Flush the dirty page to the file;
4.1.2. If rebalance chunk not transferred
4.1.2.1. Write this chunk to the temporary cache partition file;
4.1.2.2. Flush the dirty page to the file;
4.2. The node starts sending to the demander node each cache partition
file chunk one by one using FileChannel#transferTo
4.2.1. If the current chunk was modified by checkpoint thread – read
it from the temporary cache partition file;
4.2.2. If the current chunk is not touched – read it from the original
cache partition file;
5. The demander node starts to listen to new pipe incoming connections
from the supplier node on TcpCommunicationSpi;
6. The demander node creates the temporary cache partition file with
.tmp postfix in the same cache persistence directory;
7. The demander node receives each cache partition file chunk one by one
7.1. The node checks CRC for each PageMemory in the downloaded chunk;
7.2. The node flushes the downloaded chunk at the appropriate cache
partition file position;
8. When the demander node receives the whole cache partition file
8.1. The node initializes received .tmp file as its appropriate cache
partition file;
8.2. Thread-per-partition begins to apply for data entries from the
beginning of WAL-temporary storage;
8.3. All async operations corresponding to this partition file still
write to the end of temporary WAL;
8.4. At the moment of WAL-temporary storage is ready to be empty
8.4.1. Start the first checkpoint;
8.4.2. Wait for the first checkpoint ends and own the cache partition;
8.4.3. All operations now are switched to the partition file instead
of writing to the temporary WAL;
8.4.4. Schedule the temporary WAL storage deletion;
9. The supplier node deletes the temporary cache partition file;


*COMPONENTS TO CHANGE*

CommunicationSpi

To benefit from zero copy we must delegate the file transferring to
FileChannel#transferTo(long, long,
java.nio.channels.WritableByteChannel) because the fast path of
transferTo method is only executed if the destination buffer inherits
from an internal JDK class.

Preloader

A new implementation of cache entries preloader assume to be done. The
new implementation must send and receive cache partition files over
the CommunicationSpi channels by chunks of data with validation
received items. The new layer over the cache partition file must
support direct using of FileChannel#transferTo method over the
CommunicationSpi pipe connection. The connection bandwidth of the
cache partition file transfer must have the ability to be limited at
runtime.

Checkpointer

When the supplier node receives the cache partition file demand
request it will send the file over the CommunicationSpi. The cache
partition file can be concurrently updated by checkpoint thread during
its transmission. To guarantee the file consistency Сheckpointer must
use copy-on-write technique and save a copy of updated chunk into the
temporary file.

(new) Catch-up temporary WAL

While the demander node is in the partition file transmission state it
must save all cache entries corresponding to the moving partition into
a new temporary WAL storage. These entries will be applied later one
by one on the received cache partition file. All asynchronous
operations will be enrolled to the end of temporary WAL storage during
storage reads until it becomes fully read. The file-based FIFO
approach assumes to be used by this process.


*RECOVERY*

In case of crash recovery, there is no additional actions need to be
applied to keep the cache partition file consistency. We are not
recovering partition with the moving state, thus the single partition
file will be lost and only it. The uniqueness of it is guaranteed by
the single-file-transmission process. The cache partition file will be
fully loaded on the next rebalance procedure.

To provide default cluster recovery guarantee we must to:
1. Start the checkpoint process when the temporary WAL storage becomes empty;
2. Wait for the first checkpoint ends and set owning status to the
cache partition;




[1] http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
[2] https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
[3] https://issues.apache.org/jira/browse/IGNITE-8020
[4] https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Eduard Shangareev <ed...@gmail.com>.
Sergey,

>If I understand correctly when there is a continuous flow of updates to the
page already transferred to receiver checkpointer will write this page to
the log file over and over again. Do you see here any risks of exhausting
disk space on sender's side?

We could track the set of page which are in log file to avoid this issue
(any concurrent hash set would work fine);

> What if some updates come after checkpointer stopped updating log file?
How
these updates will be transferred to the receiver and applied there?

Temporary wal file on the receiver from the original approach should cover
this case.

On Tue, Nov 27, 2018 at 7:19 PM Sergey Chugunov <se...@gmail.com>
wrote:

> Eduard,
>
> This algorithm looks much easier but could you clarify some edge cased
> please?
>
> If I understand correctly when there is a continuous flow of updates to the
> page already transferred to receiver checkpointer will write this page to
> the log file over and over again. Do you see here any risks of exhausting
> disk space on sender's side?
>
> What if some updates come after checkpointer stopped updating log file? How
> these updates will be transferred to the receiver and applied there?
>
> On Tue, Nov 27, 2018 at 7:52 PM Eduard Shangareev <
> eduard.shangareev@gmail.com> wrote:
>
> > So, after some discussion, I could describe another approach on how to
> > build consistent partition on the fly.
> >
> > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > 2. After checkpoint finish, we start sending partition file (without any
> > lock) to the receiver from 0 to fixed size.
> > 3. Next checkpoints if they detect that they would override some pages of
> > transferring file should write the previous state of a page to a
> dedicated
> > file.
> > So, we would have a list of pages written 1 by 1, page id is written in
> the
> > page itself so we could determine page index. Let's name it log.
> > 4. When transfer finished checkpointer would stop updating log-file. Now
> we
> > are ready to send it to the receiver.
> > 5. On receiver side we start merging the dirty partition file with log
> > (updating it with pages from log-file).
> >
> > So, an advantage of this method:
> > - checkpoint-thread work couldn't  increase more than twice;
> > - checkpoint-threads shouldn't wait for anything;
> > - in best case, we receive partition without any extra effort.
> >
> >
> > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > eduard.shangareev@gmail.com> wrote:
> >
> > > Maxim,
> > >
> > > I have looked through your algorithm of reading partition consistently.
> > > And I have some questions/comments.
> > >
> > > 1. The algorithm requires heavy synchronization between
> checkpoint-thread
> > > and new-approach-rebalance-threads,
> > > because you need strong guarantees to not start writing or reading to
> > > chunk which was updated or started reading by the counterpart.
> > >
> > > 2. Also, if we have started transferring this chunk in original
> partition
> > > couldn't be updated by checkpoint-threads. They should wait for
> transfer
> > > finishing.
> > >
> > > 3. If sending is slow and partition is updated then in worst case
> > > checkpoint-threads would create the whole copy of the partition.
> > >
> > > So, what we have:
> > > -on every page write checkpoint-thread should synchronize with
> > > new-approach-rebalance-threads;
> > > -checkpoint-thread should do extra-work, sometimes this could be as
> huge
> > > as copying the whole partition.
> > >
> > >
> > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > ilya.kasnacheev@gmail.com>
> > > wrote:
> > >
> > >> Hello!
> > >>
> > >> This proposal will also happily break my compression-with-dictionary
> > patch
> > >> since it relies currently on only having local dictionaries.
> > >>
> > >> However, when you have compressed data, maybe speed boost is even
> > greater
> > >> with your approach.
> > >>
> > >> Regards,
> > >> --
> > >> Ilya Kasnacheev
> > >>
> > >>
> > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > >>
> > >> > Igniters,
> > >> >
> > >> >
> > >> > I'd like to take the next step of increasing the Apache Ignite with
> > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > >> > procedure doesn't utilize the network and storage device throughout
> to
> > >> > its full extent even with enough meaningful values of
> > >> > rebalanceThreadPoolSize property. As part of the previous discussion
> > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > >> > idea [3] of transferring cache partition files over the network.
> > >> > From my point, the case to which this type of rebalancing procedure
> > >> > can bring the most benefit – is adding a completely new node or set
> of
> > >> > new nodes to the cluster. Such a scenario implies fully relocation
> of
> > >> > cache partition files to the new node. To roughly estimate the
> > >> > superiority of partition file transmitting over the network the
> native
> > >> > Linux scp\rsync commands can be used. My test environment showed the
> > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > >> > single-threaded rebalance speed.
> > >> >
> > >> >
> > >> > I've prepared the design document IEP-28 [4] and accumulated all the
> > >> > process details of a new rebalance approach on that page. Below you
> > >> > can find the most significant details of the new rebalance procedure
> > >> > and components of the Apache Ignite which are proposed to change.
> > >> >
> > >> > Any feedback is very appreciated.
> > >> >
> > >> >
> > >> > *PROCESS OVERVIEW*
> > >> >
> > >> > The whole process is described in terms of rebalancing single cache
> > >> > group and partition files would be rebalanced one-by-one:
> > >> >
> > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > >> > supplier node;
> > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > >> > starts the new checkpoint process;
> > >> > 3. The supplier node creates empty the temporary cache partition
> file
> > >> > with .tmp postfix in the same cache persistence directory;
> > >> > 4. The supplier node splits the whole cache partition file into
> > >> > virtual chunks of predefined size (multiply to the PageMemory size);
> > >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > >> > cache partition file chunk and tries to flush dirty page to the
> cache
> > >> > partition file
> > >> > 4.1.1. If rebalance chunk already transferred
> > >> > 4.1.1.1. Flush the dirty page to the file;
> > >> > 4.1.2. If rebalance chunk not transferred
> > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > >> > 4.1.2.2. Flush the dirty page to the file;
> > >> > 4.2. The node starts sending to the demander node each cache
> partition
> > >> > file chunk one by one using FileChannel#transferTo
> > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > >> > it from the temporary cache partition file;
> > >> > 4.2.2. If the current chunk is not touched – read it from the
> original
> > >> > cache partition file;
> > >> > 5. The demander node starts to listen to new pipe incoming
> connections
> > >> > from the supplier node on TcpCommunicationSpi;
> > >> > 6. The demander node creates the temporary cache partition file with
> > >> > .tmp postfix in the same cache persistence directory;
> > >> > 7. The demander node receives each cache partition file chunk one by
> > one
> > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> chunk;
> > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > >> > partition file position;
> > >> > 8. When the demander node receives the whole cache partition file
> > >> > 8.1. The node initializes received .tmp file as its appropriate
> cache
> > >> > partition file;
> > >> > 8.2. Thread-per-partition begins to apply for data entries from the
> > >> > beginning of WAL-temporary storage;
> > >> > 8.3. All async operations corresponding to this partition file still
> > >> > write to the end of temporary WAL;
> > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > >> > 8.4.1. Start the first checkpoint;
> > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> partition;
> > >> > 8.4.3. All operations now are switched to the partition file instead
> > >> > of writing to the temporary WAL;
> > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > >> > 9. The supplier node deletes the temporary cache partition file;
> > >> >
> > >> >
> > >> > *COMPONENTS TO CHANGE*
> > >> >
> > >> > CommunicationSpi
> > >> >
> > >> > To benefit from zero copy we must delegate the file transferring to
> > >> > FileChannel#transferTo(long, long,
> > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > >> > transferTo method is only executed if the destination buffer
> inherits
> > >> > from an internal JDK class.
> > >> >
> > >> > Preloader
> > >> >
> > >> > A new implementation of cache entries preloader assume to be done.
> The
> > >> > new implementation must send and receive cache partition files over
> > >> > the CommunicationSpi channels by chunks of data with validation
> > >> > received items. The new layer over the cache partition file must
> > >> > support direct using of FileChannel#transferTo method over the
> > >> > CommunicationSpi pipe connection. The connection bandwidth of the
> > >> > cache partition file transfer must have the ability to be limited at
> > >> > runtime.
> > >> >
> > >> > Checkpointer
> > >> >
> > >> > When the supplier node receives the cache partition file demand
> > >> > request it will send the file over the CommunicationSpi. The cache
> > >> > partition file can be concurrently updated by checkpoint thread
> during
> > >> > its transmission. To guarantee the file consistency Сheckpointer
> must
> > >> > use copy-on-write technique and save a copy of updated chunk into
> the
> > >> > temporary file.
> > >> >
> > >> > (new) Catch-up temporary WAL
> > >> >
> > >> > While the demander node is in the partition file transmission state
> it
> > >> > must save all cache entries corresponding to the moving partition
> into
> > >> > a new temporary WAL storage. These entries will be applied later one
> > >> > by one on the received cache partition file. All asynchronous
> > >> > operations will be enrolled to the end of temporary WAL storage
> during
> > >> > storage reads until it becomes fully read. The file-based FIFO
> > >> > approach assumes to be used by this process.
> > >> >
> > >> >
> > >> > *RECOVERY*
> > >> >
> > >> > In case of crash recovery, there is no additional actions need to be
> > >> > applied to keep the cache partition file consistency. We are not
> > >> > recovering partition with the moving state, thus the single
> partition
> > >> > file will be lost and only it. The uniqueness of it is guaranteed by
> > >> > the single-file-transmission process. The cache partition file will
> be
> > >> > fully loaded on the next rebalance procedure.
> > >> >
> > >> > To provide default cluster recovery guarantee we must to:
> > >> > 1. Start the checkpoint process when the temporary WAL storage
> becomes
> > >> > empty;
> > >> > 2. Wait for the first checkpoint ends and set owning status to the
> > >> > cache partition;
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > [1]
> > >> >
> > >>
> >
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > >> > [2]
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > >> > [4]
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Sergey Chugunov <se...@gmail.com>.
Eduard,

This algorithm looks much easier but could you clarify some edge cased
please?

If I understand correctly when there is a continuous flow of updates to the
page already transferred to receiver checkpointer will write this page to
the log file over and over again. Do you see here any risks of exhausting
disk space on sender's side?

What if some updates come after checkpointer stopped updating log file? How
these updates will be transferred to the receiver and applied there?

On Tue, Nov 27, 2018 at 7:52 PM Eduard Shangareev <
eduard.shangareev@gmail.com> wrote:

> So, after some discussion, I could describe another approach on how to
> build consistent partition on the fly.
>
> 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> 2. After checkpoint finish, we start sending partition file (without any
> lock) to the receiver from 0 to fixed size.
> 3. Next checkpoints if they detect that they would override some pages of
> transferring file should write the previous state of a page to a dedicated
> file.
> So, we would have a list of pages written 1 by 1, page id is written in the
> page itself so we could determine page index. Let's name it log.
> 4. When transfer finished checkpointer would stop updating log-file. Now we
> are ready to send it to the receiver.
> 5. On receiver side we start merging the dirty partition file with log
> (updating it with pages from log-file).
>
> So, an advantage of this method:
> - checkpoint-thread work couldn't  increase more than twice;
> - checkpoint-threads shouldn't wait for anything;
> - in best case, we receive partition without any extra effort.
>
>
> On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> eduard.shangareev@gmail.com> wrote:
>
> > Maxim,
> >
> > I have looked through your algorithm of reading partition consistently.
> > And I have some questions/comments.
> >
> > 1. The algorithm requires heavy synchronization between checkpoint-thread
> > and new-approach-rebalance-threads,
> > because you need strong guarantees to not start writing or reading to
> > chunk which was updated or started reading by the counterpart.
> >
> > 2. Also, if we have started transferring this chunk in original partition
> > couldn't be updated by checkpoint-threads. They should wait for transfer
> > finishing.
> >
> > 3. If sending is slow and partition is updated then in worst case
> > checkpoint-threads would create the whole copy of the partition.
> >
> > So, what we have:
> > -on every page write checkpoint-thread should synchronize with
> > new-approach-rebalance-threads;
> > -checkpoint-thread should do extra-work, sometimes this could be as huge
> > as copying the whole partition.
> >
> >
> > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> ilya.kasnacheev@gmail.com>
> > wrote:
> >
> >> Hello!
> >>
> >> This proposal will also happily break my compression-with-dictionary
> patch
> >> since it relies currently on only having local dictionaries.
> >>
> >> However, when you have compressed data, maybe speed boost is even
> greater
> >> with your approach.
> >>
> >> Regards,
> >> --
> >> Ilya Kasnacheev
> >>
> >>
> >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> >>
> >> > Igniters,
> >> >
> >> >
> >> > I'd like to take the next step of increasing the Apache Ignite with
> >> > enabled persistence rebalance speed. Currently, the rebalancing
> >> > procedure doesn't utilize the network and storage device throughout to
> >> > its full extent even with enough meaningful values of
> >> > rebalanceThreadPoolSize property. As part of the previous discussion
> >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> >> > idea [3] of transferring cache partition files over the network.
> >> > From my point, the case to which this type of rebalancing procedure
> >> > can bring the most benefit – is adding a completely new node or set of
> >> > new nodes to the cluster. Such a scenario implies fully relocation of
> >> > cache partition files to the new node. To roughly estimate the
> >> > superiority of partition file transmitting over the network the native
> >> > Linux scp\rsync commands can be used. My test environment showed the
> >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> >> > single-threaded rebalance speed.
> >> >
> >> >
> >> > I've prepared the design document IEP-28 [4] and accumulated all the
> >> > process details of a new rebalance approach on that page. Below you
> >> > can find the most significant details of the new rebalance procedure
> >> > and components of the Apache Ignite which are proposed to change.
> >> >
> >> > Any feedback is very appreciated.
> >> >
> >> >
> >> > *PROCESS OVERVIEW*
> >> >
> >> > The whole process is described in terms of rebalancing single cache
> >> > group and partition files would be rebalanced one-by-one:
> >> >
> >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> >> > supplier node;
> >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> >> > starts the new checkpoint process;
> >> > 3. The supplier node creates empty the temporary cache partition file
> >> > with .tmp postfix in the same cache persistence directory;
> >> > 4. The supplier node splits the whole cache partition file into
> >> > virtual chunks of predefined size (multiply to the PageMemory size);
> >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> >> > cache partition file chunk and tries to flush dirty page to the cache
> >> > partition file
> >> > 4.1.1. If rebalance chunk already transferred
> >> > 4.1.1.1. Flush the dirty page to the file;
> >> > 4.1.2. If rebalance chunk not transferred
> >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> >> > 4.1.2.2. Flush the dirty page to the file;
> >> > 4.2. The node starts sending to the demander node each cache partition
> >> > file chunk one by one using FileChannel#transferTo
> >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> >> > it from the temporary cache partition file;
> >> > 4.2.2. If the current chunk is not touched – read it from the original
> >> > cache partition file;
> >> > 5. The demander node starts to listen to new pipe incoming connections
> >> > from the supplier node on TcpCommunicationSpi;
> >> > 6. The demander node creates the temporary cache partition file with
> >> > .tmp postfix in the same cache persistence directory;
> >> > 7. The demander node receives each cache partition file chunk one by
> one
> >> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
> >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> >> > partition file position;
> >> > 8. When the demander node receives the whole cache partition file
> >> > 8.1. The node initializes received .tmp file as its appropriate cache
> >> > partition file;
> >> > 8.2. Thread-per-partition begins to apply for data entries from the
> >> > beginning of WAL-temporary storage;
> >> > 8.3. All async operations corresponding to this partition file still
> >> > write to the end of temporary WAL;
> >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> >> > 8.4.1. Start the first checkpoint;
> >> > 8.4.2. Wait for the first checkpoint ends and own the cache partition;
> >> > 8.4.3. All operations now are switched to the partition file instead
> >> > of writing to the temporary WAL;
> >> > 8.4.4. Schedule the temporary WAL storage deletion;
> >> > 9. The supplier node deletes the temporary cache partition file;
> >> >
> >> >
> >> > *COMPONENTS TO CHANGE*
> >> >
> >> > CommunicationSpi
> >> >
> >> > To benefit from zero copy we must delegate the file transferring to
> >> > FileChannel#transferTo(long, long,
> >> > java.nio.channels.WritableByteChannel) because the fast path of
> >> > transferTo method is only executed if the destination buffer inherits
> >> > from an internal JDK class.
> >> >
> >> > Preloader
> >> >
> >> > A new implementation of cache entries preloader assume to be done. The
> >> > new implementation must send and receive cache partition files over
> >> > the CommunicationSpi channels by chunks of data with validation
> >> > received items. The new layer over the cache partition file must
> >> > support direct using of FileChannel#transferTo method over the
> >> > CommunicationSpi pipe connection. The connection bandwidth of the
> >> > cache partition file transfer must have the ability to be limited at
> >> > runtime.
> >> >
> >> > Checkpointer
> >> >
> >> > When the supplier node receives the cache partition file demand
> >> > request it will send the file over the CommunicationSpi. The cache
> >> > partition file can be concurrently updated by checkpoint thread during
> >> > its transmission. To guarantee the file consistency Сheckpointer must
> >> > use copy-on-write technique and save a copy of updated chunk into the
> >> > temporary file.
> >> >
> >> > (new) Catch-up temporary WAL
> >> >
> >> > While the demander node is in the partition file transmission state it
> >> > must save all cache entries corresponding to the moving partition into
> >> > a new temporary WAL storage. These entries will be applied later one
> >> > by one on the received cache partition file. All asynchronous
> >> > operations will be enrolled to the end of temporary WAL storage during
> >> > storage reads until it becomes fully read. The file-based FIFO
> >> > approach assumes to be used by this process.
> >> >
> >> >
> >> > *RECOVERY*
> >> >
> >> > In case of crash recovery, there is no additional actions need to be
> >> > applied to keep the cache partition file consistency. We are not
> >> > recovering partition with the moving state, thus the single partition
> >> > file will be lost and only it. The uniqueness of it is guaranteed by
> >> > the single-file-transmission process. The cache partition file will be
> >> > fully loaded on the next rebalance procedure.
> >> >
> >> > To provide default cluster recovery guarantee we must to:
> >> > 1. Start the checkpoint process when the temporary WAL storage becomes
> >> > empty;
> >> > 2. Wait for the first checkpoint ends and set owning status to the
> >> > cache partition;
> >> >
> >> >
> >> >
> >> >
> >> > [1]
> >> >
> >>
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> >> > [2]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> >> > [4]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> >> >
> >>
> >
>

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Maxim Muzafarov <ma...@gmail.com>.
Igniters,


Below you can find measurements and profiling results for rebalancing
procedure with the single persisted cache (persisted on SSD drives).
I've posted other results with different hardware configuration on the
confluence page within cpu\disk\network utilization graphs. Only most
important numbers are left here.


batches: 146938
rows: 77321844
rows per batch: 526

time (total): 20 min
cache size: 78055 MB
rebalance speed: 63 MB\sec
rows per sec: 62965 rows
batch per sec: 119 batches


+ cache rebalance total: 100.00%
+ + preload on demander: 95.70%
+ + + offheap().invoke(..): 76.22%
+ + + + dataTree.invoke(..): 72.96%
+ + + + + BPlusTree.invokeDown(..): 11.98% <<<--!
+ + + + + FreeList.insertDataRow(..): 57.65% <<<--!
+ + + wal().log(..): 11.28%
+ message serialization: 0.13%
+ network delay between meesages (total): 1.23%
+ make batch on supplier handleDemandMessage(..): 19.85%


[1] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com> wrote:
>
> Maxim,
>
> Regarding MVCC - this is essentially a copy-on-write approach. New entry is
> created on every update. They are cleaned asynchronously by dedicated
> threads (aka "vacuum").
>
> I looked at the document you mentioned, thank you for pointing to it. But
> it doesn't answer all questions around existing design, and what I am
> trying to do is to get how deep do we understand current problems. It is
> very true that various subsystems, such as buffer managers, WALs,
> supporting sctructures, etc. incur very serious overhead. And when it comes
> to heavy operations implementors typically seek for a way to bypass as much
> components as possible, taking in count that different shortcuts lead to
> different types of side effects. And IMO our very important goal for now is
> to create space of possible improvements, and estimate their costs, risks
> and applicability for product's configuration space.
>
> Let me claridy several questions about current rebalance implementation, as
> I am not a big expert here.
> 1) Is it correct that supplier sends only one message for every demand
> message? If yes, then streaming should improve network utilization a lot.
> 2) Is it correct that for user caches we process supply messages in a
> system pool? Did we consider moving it to striped pool? Because if all
> operations on a single partition is ordered, we may apply a number of
> critical optimizations - bypassing page cache and checkpointer for new
> entries, batched index updates, batched free list updates, etc.
> 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> conditions when it could be disabled on supplier side?
> 4) Most important - have we tried to profile plain single-threaded
> rebalance without concurrent write load? We need to have clear
> understanding on where time is spent - supplier/demander, cpu/network/disk,
> etc. Some Java tracing code should help.
>
> And one question regarding proposed implementation - how are we going to
> handle secondary indexes?
>
> [1] https://issues.apache.org/jira/browse/IGNITE-8017
>
>
> On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:
>
> > Eduard,
> >
> > Thank you very much for the discussion!
> >
> > Your algorithm looks much better for me too and easier to implement.
> > I'll update appropriate process points on IEP page of the proposed
> > rebalance procedure.
> > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > <ed...@gmail.com> wrote:
> > >
> > > So, after some discussion, I could describe another approach on how to
> > > build consistent partition on the fly.
> > >
> > > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > > 2. After checkpoint finish, we start sending partition file (without any
> > > lock) to the receiver from 0 to fixed size.
> > > 3. Next checkpoints if they detect that they would override some pages of
> > > transferring file should write the previous state of a page to a
> > dedicated
> > > file.
> > > So, we would have a list of pages written 1 by 1, page id is written in
> > the
> > > page itself so we could determine page index. Let's name it log.
> > > 4. When transfer finished checkpointer would stop updating log-file. Now
> > we
> > > are ready to send it to the receiver.
> > > 5. On receiver side we start merging the dirty partition file with log
> > > (updating it with pages from log-file).
> > >
> > > So, an advantage of this method:
> > > - checkpoint-thread work couldn't  increase more than twice;
> > > - checkpoint-threads shouldn't wait for anything;
> > > - in best case, we receive partition without any extra effort.
> > >
> > >
> > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > eduard.shangareev@gmail.com> wrote:
> > >
> > > > Maxim,
> > > >
> > > > I have looked through your algorithm of reading partition consistently.
> > > > And I have some questions/comments.
> > > >
> > > > 1. The algorithm requires heavy synchronization between
> > checkpoint-thread
> > > > and new-approach-rebalance-threads,
> > > > because you need strong guarantees to not start writing or reading to
> > > > chunk which was updated or started reading by the counterpart.
> > > >
> > > > 2. Also, if we have started transferring this chunk in original
> > partition
> > > > couldn't be updated by checkpoint-threads. They should wait for
> > transfer
> > > > finishing.
> > > >
> > > > 3. If sending is slow and partition is updated then in worst case
> > > > checkpoint-threads would create the whole copy of the partition.
> > > >
> > > > So, what we have:
> > > > -on every page write checkpoint-thread should synchronize with
> > > > new-approach-rebalance-threads;
> > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > huge
> > > > as copying the whole partition.
> > > >
> > > >
> > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > ilya.kasnacheev@gmail.com>
> > > > wrote:
> > > >
> > > >> Hello!
> > > >>
> > > >> This proposal will also happily break my compression-with-dictionary
> > patch
> > > >> since it relies currently on only having local dictionaries.
> > > >>
> > > >> However, when you have compressed data, maybe speed boost is even
> > greater
> > > >> with your approach.
> > > >>
> > > >> Regards,
> > > >> --
> > > >> Ilya Kasnacheev
> > > >>
> > > >>
> > > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > > >>
> > > >> > Igniters,
> > > >> >
> > > >> >
> > > >> > I'd like to take the next step of increasing the Apache Ignite with
> > > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > > >> > procedure doesn't utilize the network and storage device throughout
> > to
> > > >> > its full extent even with enough meaningful values of
> > > >> > rebalanceThreadPoolSize property. As part of the previous discussion
> > > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > > >> > idea [3] of transferring cache partition files over the network.
> > > >> > From my point, the case to which this type of rebalancing procedure
> > > >> > can bring the most benefit – is adding a completely new node or set
> > of
> > > >> > new nodes to the cluster. Such a scenario implies fully relocation
> > of
> > > >> > cache partition files to the new node. To roughly estimate the
> > > >> > superiority of partition file transmitting over the network the
> > native
> > > >> > Linux scp\rsync commands can be used. My test environment showed the
> > > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > >> > single-threaded rebalance speed.
> > > >> >
> > > >> >
> > > >> > I've prepared the design document IEP-28 [4] and accumulated all the
> > > >> > process details of a new rebalance approach on that page. Below you
> > > >> > can find the most significant details of the new rebalance procedure
> > > >> > and components of the Apache Ignite which are proposed to change.
> > > >> >
> > > >> > Any feedback is very appreciated.
> > > >> >
> > > >> >
> > > >> > *PROCESS OVERVIEW*
> > > >> >
> > > >> > The whole process is described in terms of rebalancing single cache
> > > >> > group and partition files would be rebalanced one-by-one:
> > > >> >
> > > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > > >> > supplier node;
> > > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > > >> > starts the new checkpoint process;
> > > >> > 3. The supplier node creates empty the temporary cache partition
> > file
> > > >> > with .tmp postfix in the same cache persistence directory;
> > > >> > 4. The supplier node splits the whole cache partition file into
> > > >> > virtual chunks of predefined size (multiply to the PageMemory size);
> > > >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > > >> > cache partition file chunk and tries to flush dirty page to the
> > cache
> > > >> > partition file
> > > >> > 4.1.1. If rebalance chunk already transferred
> > > >> > 4.1.1.1. Flush the dirty page to the file;
> > > >> > 4.1.2. If rebalance chunk not transferred
> > > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > >> > 4.1.2.2. Flush the dirty page to the file;
> > > >> > 4.2. The node starts sending to the demander node each cache
> > partition
> > > >> > file chunk one by one using FileChannel#transferTo
> > > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > > >> > it from the temporary cache partition file;
> > > >> > 4.2.2. If the current chunk is not touched – read it from the
> > original
> > > >> > cache partition file;
> > > >> > 5. The demander node starts to listen to new pipe incoming
> > connections
> > > >> > from the supplier node on TcpCommunicationSpi;
> > > >> > 6. The demander node creates the temporary cache partition file with
> > > >> > .tmp postfix in the same cache persistence directory;
> > > >> > 7. The demander node receives each cache partition file chunk one
> > by one
> > > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> > chunk;
> > > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > > >> > partition file position;
> > > >> > 8. When the demander node receives the whole cache partition file
> > > >> > 8.1. The node initializes received .tmp file as its appropriate
> > cache
> > > >> > partition file;
> > > >> > 8.2. Thread-per-partition begins to apply for data entries from the
> > > >> > beginning of WAL-temporary storage;
> > > >> > 8.3. All async operations corresponding to this partition file still
> > > >> > write to the end of temporary WAL;
> > > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > >> > 8.4.1. Start the first checkpoint;
> > > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> > partition;
> > > >> > 8.4.3. All operations now are switched to the partition file instead
> > > >> > of writing to the temporary WAL;
> > > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > > >> > 9. The supplier node deletes the temporary cache partition file;
> > > >> >
> > > >> >
> > > >> > *COMPONENTS TO CHANGE*
> > > >> >
> > > >> > CommunicationSpi
> > > >> >
> > > >> > To benefit from zero copy we must delegate the file transferring to
> > > >> > FileChannel#transferTo(long, long,
> > > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > > >> > transferTo method is only executed if the destination buffer
> > inherits
> > > >> > from an internal JDK class.
> > > >> >
> > > >> > Preloader
> > > >> >
> > > >> > A new implementation of cache entries preloader assume to be done.
> > The
> > > >> > new implementation must send and receive cache partition files over
> > > >> > the CommunicationSpi channels by chunks of data with validation
> > > >> > received items. The new layer over the cache partition file must
> > > >> > support direct using of FileChannel#transferTo method over the
> > > >> > CommunicationSpi pipe connection. The connection bandwidth of the
> > > >> > cache partition file transfer must have the ability to be limited at
> > > >> > runtime.
> > > >> >
> > > >> > Checkpointer
> > > >> >
> > > >> > When the supplier node receives the cache partition file demand
> > > >> > request it will send the file over the CommunicationSpi. The cache
> > > >> > partition file can be concurrently updated by checkpoint thread
> > during
> > > >> > its transmission. To guarantee the file consistency Сheckpointer
> > must
> > > >> > use copy-on-write technique and save a copy of updated chunk into
> > the
> > > >> > temporary file.
> > > >> >
> > > >> > (new) Catch-up temporary WAL
> > > >> >
> > > >> > While the demander node is in the partition file transmission state
> > it
> > > >> > must save all cache entries corresponding to the moving partition
> > into
> > > >> > a new temporary WAL storage. These entries will be applied later one
> > > >> > by one on the received cache partition file. All asynchronous
> > > >> > operations will be enrolled to the end of temporary WAL storage
> > during
> > > >> > storage reads until it becomes fully read. The file-based FIFO
> > > >> > approach assumes to be used by this process.
> > > >> >
> > > >> >
> > > >> > *RECOVERY*
> > > >> >
> > > >> > In case of crash recovery, there is no additional actions need to be
> > > >> > applied to keep the cache partition file consistency. We are not
> > > >> > recovering partition with the moving state, thus the single
> > partition
> > > >> > file will be lost and only it. The uniqueness of it is guaranteed by
> > > >> > the single-file-transmission process. The cache partition file will
> > be
> > > >> > fully loaded on the next rebalance procedure.
> > > >> >
> > > >> > To provide default cluster recovery guarantee we must to:
> > > >> > 1. Start the checkpoint process when the temporary WAL storage
> > becomes
> > > >> > empty;
> > > >> > 2. Wait for the first checkpoint ends and set owning status to the
> > > >> > cache partition;
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > [1]
> > > >> >
> > > >>
> > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > >> > [2]
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > >> > [4]
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > >> >
> > > >>
> > > >
> >

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Nikolay Izhikov <ni...@apache.org>.
Maxim, thanks!

В Ср, 14/08/2019 в 18:26 +0300, Maxim Muzafarov пишет:
> Nikolay,
> 
> In my message above I've described only internal local BackupManager
> for the rebalance needs, but for the backup feature of the whole
> Ignite cluster I also have some thoughts. I'll give you a detailed
> answer in an appropriate discussion topic [1] a bit later.
> 
> [1] http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Hot-cache-backup-td41034.html
> 
> On Wed, 14 Aug 2019 at 16:40, Nikolay Izhikov <ni...@apache.org> wrote:
> > 
> > Hello, Maxim.
> > 
> > I think backup is a great feature for Ignite.
> > Let's have it!
> > 
> > Few notes for it:
> > 
> > 1. Backup directory should be taken from node configuration.
> > 
> > 2. Backup should be stored on local node only.
> > Ignite admin can write sh script to move all backuped partitions to one storage by himself.
> > 
> > 3. Ignite should provide CLI tools to start backup/restore procedure.
> > 
> > Questions:
> > 
> > 1. How each backup would be identified?
> > 2. Do you plan to implement backup of cache or cache group?
> > 3. How restore process would be implemented from user point of view?
> >         Can we interact with cache during restore?
> > 
> > В Ср, 14/08/2019 в 16:13 +0300, Maxim Muzafarov пишет:
> > > Igniters,
> > > 
> > > 
> > > Since the file transmission between Ignite nodes [2] have been merged
> > > to the master branch (it is the first mandatory part of file-based
> > > rebalance procedure) I'd like to focus on the next step of the current
> > > IEP-28 - the process of creating snapshots of cache group partitions.
> > > 
> > > Previously, we've discussed a creation of cache group backups [3] for
> > > the whole cluster. I'd like to take into account the GridGain
> > > experience with such snapshot creation and, at first, focuses on the
> > > local internal IgniteBackupManager which will be used for such purpose
> > > [4] [1].
> > > 
> > > Changes are almost ready. I need some additional time to finalize the
> > > PR with backup creation.
> > > 
> > > 
> > > API (create local partitions copy)
> > > 
> > > /**
> > >  * @param name Unique backup name.
> > >  * @param parts Collection of pairs group and appropratate cache
> > > partition to be backuped.
> > >  * @param dir Local backup directory.
> > >  */
> > > public IgniteInternalFuture<?> backup(
> > >     String name,
> > >     Map<Integer, Set<Integer>> parts,
> > >     File dir,
> > >     ExecutorService backupSvc (this can be completely optional)
> > > );
> > > 
> > > 
> > > API (backup partitoins over the network)
> > > 
> > > /**
> > >  * @param name Unique backup name.
> > >  * @param parts Collection of pairs group and appropratate cache
> > > partition to be backuped.
> > >  * @param snd File sender provider.
> > >  */
> > > public IgniteInternalFuture<?> backup(
> > >     String name,
> > >     Map<Integer, Set<Integer>> parts,
> > >     Supplier<GridIoManager.TransmissionSender> snd
> > > );
> > > 
> > > [1] https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing#IEP-28:Clusterpeer-2-peerbalancing-Copypartitiononthefly
> > > [2] https://issues.apache.org/jira/browse/IGNITE-10619
> > > [3] http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Hot-cache-backup-td41034.html
> > > [4] https://issues.apache.org/jira/browse/IGNITE-11073
> > > 
> > > On Wed, 12 Dec 2018 at 11:15, Vladimir Ozerov <vo...@gridgain.com> wrote:
> > > > 
> > > > Maxim,
> > > > 
> > > > Thank you for excellent analysis! From profiling data I see the following:
> > > > 1) Almost no parallelism - one rebalance thread is used (default), two responses are sent per a single demand request (default)
> > > > 2) All system resources are underutilized - CPU, disk, network
> > > > 3) Huge hotspot ion free lists
> > > > 
> > > > In general I would recommend to consider the following points during further rebalance optimization:
> > > > 1) Start with the fact that rebalance always causes system degradation due to additional hardware resources required. Different deployments may require different degradation modes. Sometimes "soft" mode is preferable - long rebalance with low system overhead. This is what we see now. Sometimes the opposite - as short rebalance as possible at the cost of severe degradation in operations. Sometimes - something in the middle. Every optimization we made should have clear explanation on how system degrades.
> > > > 2) We need to investigate the hotspot on free lists. Looks like this is the main limiting factor for now. Alex, do you have any ideas what is this? Is it possible to bypass freelists completely during rebalance at the cost of higher data fragmentation during concurrent updates?
> > > > 3) We need to investigate streaming rebalance mode, when supplier constantly streams data to demander similarly to our data streamer. It should be fairly easy to implement, applicable for all modes and may speedup rebalance up to 5-10 times. Great thing about this approach is that it will allow users to choose between system stress level and rebalance throughput easily.
> > > > 4) File transfer rebalance: we need to have clear design of failure and concurrency cases and degradation modes. Several questions to answer:
> > > > 4.1) What would happen if another rebalance starts when previous is not finished yet?
> > > > 4.2) What would happen if supplier or demander fails in the middle? What kind of cleanup would be required
> > > > 4.3) Degradation: what kind of problems should users expect due to massive disk and network load during file transfer and due to data merging on demander side?
> > > > 4.4) Degradation: how secondary indexes would be rebuilt on demander side? Note that until indexes are ready node is not operational and cannot become partition owner, and index rebuild is essentially full data rescan with potentially the same issues with slow updates of persistent data structures we have now.
> > > > 
> > > > Vladimir.
> > > > 
> > > > On Fri, Dec 7, 2018 at 3:32 PM Maxim Muzafarov <ma...@gmail.com> wrote:
> > > > > 
> > > > > Vladimir,
> > > > > 
> > > > > 
> > > > > Let me propose to consider the whole this rebalance process as having
> > > > > three strategies:
> > > > > - The classical message-based approach, preferable to use for in-memory caches;
> > > > > - Historical rebalance based on WAL, used for rebalancing persisted
> > > > > caches deltas;
> > > > > - (new) File-based rebalance (current IEP-28), used for relocation of
> > > > > full cache partitions.
> > > > > 
> > > > > 
> > > > > First of all, I want to show you that for the full cache relocation
> > > > > file-based rebalancing strategy from my point has a set of advantages
> > > > > prior to the message-based approach. Let's also assume that the time
> > > > > spent on WAL logging during the rebalance procedure is already
> > > > > optimized (we are not taking it into account at all).
> > > > > 
> > > > > According to preliminary measurements [8] and the message above we
> > > > > spend more than 65% of rebalancing time on creating K-V cache pair for
> > > > > preloading entries and supporting internal data structures. It is true
> > > > > as for in-memory cluster configuration and for a cluster with enabled
> > > > > persistence. It is also true, that these data structures can be used
> > > > > more efficiently by implementing batch entry processing for them. And
> > > > > it should be done (a ticket for it is already created [3]).
> > > > > 
> > > > > Let's have a look closer to the simple example.
> > > > > 
> > > > > I've collected some information about a cache on my stress-testing cluster:
> > > > > partitions (total): 65534
> > > > > single partition size: 437 MB
> > > > > rebalance batch: 512 Kb
> > > > > batches per partition: 874
> > > > > partitions per node: 606
> > > > > batches per node: 529644
> > > > > 
> > > > > Let's assume that we've already implemented batched entry processing
> > > > > and we perform bulk operations over internal data structures.
> > > > > Regarding these assumptions, we still need to process 874 batches per
> > > > > each cache partition to transfer data. I will cost us up to 15 seconds
> > > > > per each partition file, a lot of CPU cycles to maintain internal data
> > > > > structures and block for a while other threads waiting for releasing
> > > > > database checkpoint lock.
> > > > > 
> > > > > Increasing the rebalance batch size is not efficient here because we
> > > > > are starting to hold the database lock for too long. It will lead to
> > > > > thread starvation and will only slow down the whole rebalance speed.
> > > > > Exactly the same as increasing batch size, making the rebalance thread
> > > > > pool bigger can lead to the cluster performance drop for almost the
> > > > > same reasons.
> > > > > 
> > > > > I think the file-based rebalance can provide us (prior to the batch
> > > > > processing) for huge caches:
> > > > >  - a fair non-blocking approach in each part of the rebalancing procedure;
> > > > >  - reduce the number of locks being acquired (the other threads can
> > > > > make bigger progress);
> > > > >  - a zero-copy transmission on supplier saves CPU cycles and memory bandwidth;
> > > > >  - as a result, the transferable batch size increased up to the whole
> > > > > partition file size;
> > > > > 
> > > > > SUMMARY TO DO
> > > > > 
> > > > > The plan to do and other ideas (without risks evaluation):
> > > > > 
> > > > > Message-based approach.
> > > > > Optimization to do by priority [3] [2] and may be [9].
> > > > > 
> > > > > Historical rebalance based on WAL.
> > > > > Suppose, nothing to do here as Sergey already working on the issue [1]
> > > > > with turning off WAL.
> > > > > 
> > > > > (new) Full cache data relocation.
> > > > > Prototyping current IEP-28.
> > > > > 
> > > > > I think another approach can be also implemented.
> > > > > During the rebalance procedure we can write entries to data pages
> > > > > directly skipping free lists, PK index and secondary index. Once the
> > > > > partition preloading is finished, we will rebuild free list and all
> > > > > indexes.
> > > > > Will it work for us?
> > > > > 
> > > > > ANSWERS
> > > > > 
> > > > > > 1) Is it correct that supplier sends only one message for every demand
> > > > > > message? If yes, then streaming should improve network utilization a lot.
> > > > > 
> > > > > I think we already have such ability for the Apache Ignite (not
> > > > > exactly streaming). The CacheConfiguration#rebalanceBatchesPrefetchCnt
> > > > > can be used here to reduce the system delay between send\receive
> > > > > message process. The default value is more than enough for most of the
> > > > > cases. The testing results showed only 7 seconds (0.32%) delay during
> > > > > the 40 min cache rebalance procedure. So, each supply message is ready
> > > > > to be sent when the next demand message arrives.
> > > > > 
> > > > > 
> > > > > > 2) Is it correct that for user caches we process supply messages in a
> > > > > > system pool? Did we consider moving it to striped pool? Because if all
> > > > > > operations on a single partition is ordered, we may apply a number of
> > > > > > critical optimizations - bypassing page cache and checkpointer for new
> > > > > > entries, batched index updates, batched free list updates, etc.
> > > > > 
> > > > > I think the rebalance procedure should not cause a thousand messages
> > > > > per second, so we don't need to move the rebalance procedure to the
> > > > > stripped pool. We should have a limited stable load for rebalancing
> > > > > procedure on the cluster. As for the second part, are you talking
> > > > > about thread per partition model? If yes, we have tickets for it [4],
> > > > > [5], [6].
> > > > > 
> > > > > > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> > > > > > conditions when it could be disabled on supplier side?
> > > > > 
> > > > > Do you mean the demander side? Why we should try to disable it on the
> > > > > supplier node? I do not take it into account at all because it can be
> > > > > easily done (suppose issue [1] is about it). But it doesn't help us
> > > > > much for the full cache relocations.
> > > > > 
> > > > > > 4) Most important - have we tried to profile plain single-threaded
> > > > > > rebalance without concurrent write load? We need to have clear
> > > > > > understanding on where time is spent - supplier/demander, cpu/network/disk,
> > > > > > etc. Some Java tracing code should help.
> > > > > 
> > > > > I've updated some information about profiling results on the
> > > > > confluence page [8]. If you will find that I've missed something or
> > > > > information is unclear, please, let me know and I will fix it.
> > > > > 
> > > > > > And one question regarding proposed implementation - how are we going to
> > > > > > handle secondary indexes?
> > > > > 
> > > > > Thank you for pointing this out. Actually, the current IEP page
> > > > > doesn't cover this case. I think we can schedule rebuild indexes after
> > > > > all partition files would be transferred. This approach was also
> > > > > mentioned at [2] issue.
> > > > > Will it be the correct?
> > > > > 
> > > > > 
> > > > > [1] https://issues.apache.org/jira/browse/IGNITE-10505
> > > > > [2] https://issues.apache.org/jira/browse/IGNITE-7934
> > > > > [3] https://issues.apache.org/jira/browse/IGNITE-7935
> > > > > 
> > > > > [4] https://issues.apache.org/jira/browse/IGNITE-4682
> > > > > [5] https://issues.apache.org/jira/browse/IGNITE-4506
> > > > > [6] https://issues.apache.org/jira/browse/IGNITE-4680
> > > > > 
> > > > > [7] https://issues.apache.org/jira/browse/IGNITE-7027
> > > > > [8] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
> > > > > [9] https://issues.apache.org/jira/browse/IGNITE-9520
> > > > > On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com> wrote:
> > > > > > 
> > > > > > Maxim,
> > > > > > 
> > > > > > Regarding MVCC - this is essentially a copy-on-write approach. New entry is
> > > > > > created on every update. They are cleaned asynchronously by dedicated
> > > > > > threads (aka "vacuum").
> > > > > > 
> > > > > > I looked at the document you mentioned, thank you for pointing to it. But
> > > > > > it doesn't answer all questions around existing design, and what I am
> > > > > > trying to do is to get how deep do we understand current problems. It is
> > > > > > very true that various subsystems, such as buffer managers, WALs,
> > > > > > supporting sctructures, etc. incur very serious overhead. And when it comes
> > > > > > to heavy operations implementors typically seek for a way to bypass as much
> > > > > > components as possible, taking in count that different shortcuts lead to
> > > > > > different types of side effects. And IMO our very important goal for now is
> > > > > > to create space of possible improvements, and estimate their costs, risks
> > > > > > and applicability for product's configuration space.
> > > > > > 
> > > > > > Let me claridy several questions about current rebalance implementation, as
> > > > > > I am not a big expert here.
> > > > > > 1) Is it correct that supplier sends only one message for every demand
> > > > > > message? If yes, then streaming should improve network utilization a lot.
> > > > > > 2) Is it correct that for user caches we process supply messages in a
> > > > > > system pool? Did we consider moving it to striped pool? Because if all
> > > > > > operations on a single partition is ordered, we may apply a number of
> > > > > > critical optimizations - bypassing page cache and checkpointer for new
> > > > > > entries, batched index updates, batched free list updates, etc.
> > > > > > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> > > > > > conditions when it could be disabled on supplier side?
> > > > > > 4) Most important - have we tried to profile plain single-threaded
> > > > > > rebalance without concurrent write load? We need to have clear
> > > > > > understanding on where time is spent - supplier/demander, cpu/network/disk,
> > > > > > etc. Some Java tracing code should help.
> > > > > > 
> > > > > > And one question regarding proposed implementation - how are we going to
> > > > > > handle secondary indexes?
> > > > > > 
> > > > > > [1] https://issues.apache.org/jira/browse/IGNITE-8017
> > > > > > 
> > > > > > 
> > > > > > On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:
> > > > > > 
> > > > > > > Eduard,
> > > > > > > 
> > > > > > > Thank you very much for the discussion!
> > > > > > > 
> > > > > > > Your algorithm looks much better for me too and easier to implement.
> > > > > > > I'll update appropriate process points on IEP page of the proposed
> > > > > > > rebalance procedure.
> > > > > > > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > > > > > > <ed...@gmail.com> wrote:
> > > > > > > > 
> > > > > > > > So, after some discussion, I could describe another approach on how to
> > > > > > > > build consistent partition on the fly.
> > > > > > > > 
> > > > > > > > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > > > > > > > 2. After checkpoint finish, we start sending partition file (without any
> > > > > > > > lock) to the receiver from 0 to fixed size.
> > > > > > > > 3. Next checkpoints if they detect that they would override some pages of
> > > > > > > > transferring file should write the previous state of a page to a
> > > > > > > 
> > > > > > > dedicated
> > > > > > > > file.
> > > > > > > > So, we would have a list of pages written 1 by 1, page id is written in
> > > > > > > 
> > > > > > > the
> > > > > > > > page itself so we could determine page index. Let's name it log.
> > > > > > > > 4. When transfer finished checkpointer would stop updating log-file. Now
> > > > > > > 
> > > > > > > we
> > > > > > > > are ready to send it to the receiver.
> > > > > > > > 5. On receiver side we start merging the dirty partition file with log
> > > > > > > > (updating it with pages from log-file).
> > > > > > > > 
> > > > > > > > So, an advantage of this method:
> > > > > > > > - checkpoint-thread work couldn't  increase more than twice;
> > > > > > > > - checkpoint-threads shouldn't wait for anything;
> > > > > > > > - in best case, we receive partition without any extra effort.
> > > > > > > > 
> > > > > > > > 
> > > > > > > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > > > > > > eduard.shangareev@gmail.com> wrote:
> > > > > > > > 
> > > > > > > > > Maxim,
> > > > > > > > > 
> > > > > > > > > I have looked through your algorithm of reading partition consistently.
> > > > > > > > > And I have some questions/comments.
> > > > > > > > > 
> > > > > > > > > 1. The algorithm requires heavy synchronization between
> > > > > > > 
> > > > > > > checkpoint-thread
> > > > > > > > > and new-approach-rebalance-threads,
> > > > > > > > > because you need strong guarantees to not start writing or reading to
> > > > > > > > > chunk which was updated or started reading by the counterpart.
> > > > > > > > > 
> > > > > > > > > 2. Also, if we have started transferring this chunk in original
> > > > > > > 
> > > > > > > partition
> > > > > > > > > couldn't be updated by checkpoint-threads. They should wait for
> > > > > > > 
> > > > > > > transfer
> > > > > > > > > finishing.
> > > > > > > > > 
> > > > > > > > > 3. If sending is slow and partition is updated then in worst case
> > > > > > > > > checkpoint-threads would create the whole copy of the partition.
> > > > > > > > > 
> > > > > > > > > So, what we have:
> > > > > > > > > -on every page write checkpoint-thread should synchronize with
> > > > > > > > > new-approach-rebalance-threads;
> > > > > > > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > > > > > > 
> > > > > > > huge
> > > > > > > > > as copying the whole partition.
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > > > > > > 
> > > > > > > ilya.kasnacheev@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > 
> > > > > > > > > > Hello!
> > > > > > > > > > 
> > > > > > > > > > This proposal will also happily break my compression-with-dictionary
> > > > > > > 
> > > > > > > patch
> > > > > > > > > > since it relies currently on only having local dictionaries.
> > > > > > > > > > 
> > > > > > > > > > However, when you have compressed data, maybe speed boost is even
> > > > > > > 
> > > > > > > greater
> > > > > > > > > > with your approach.
> > > > > > > > > > 
> > > > > > > > > > Regards,
> > > > > > > > > > --
> > > > > > > > > > Ilya Kasnacheev
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > > > > > > > > > 
> > > > > > > > > > > Igniters,
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > I'd like to take the next step of increasing the Apache Ignite with
> > > > > > > > > > > enabled persistence rebalance speed. Currently, the rebalancing
> > > > > > > > > > > procedure doesn't utilize the network and storage device throughout
> > > > > > > 
> > > > > > > to
> > > > > > > > > > > its full extent even with enough meaningful values of
> > > > > > > > > > > rebalanceThreadPoolSize property. As part of the previous discussion
> > > > > > > > > > > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > > > > > > > > > > idea [3] of transferring cache partition files over the network.
> > > > > > > > > > > From my point, the case to which this type of rebalancing procedure
> > > > > > > > > > > can bring the most benefit – is adding a completely new node or set
> > > > > > > 
> > > > > > > of
> > > > > > > > > > > new nodes to the cluster. Such a scenario implies fully relocation
> > > > > > > 
> > > > > > > of
> > > > > > > > > > > cache partition files to the new node. To roughly estimate the
> > > > > > > > > > > superiority of partition file transmitting over the network the
> > > > > > > 
> > > > > > > native
> > > > > > > > > > > Linux scp\rsync commands can be used. My test environment showed the
> > > > > > > > > > > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > > > > > > > > > single-threaded rebalance speed.
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > I've prepared the design document IEP-28 [4] and accumulated all the
> > > > > > > > > > > process details of a new rebalance approach on that page. Below you
> > > > > > > > > > > can find the most significant details of the new rebalance procedure
> > > > > > > > > > > and components of the Apache Ignite which are proposed to change.
> > > > > > > > > > > 
> > > > > > > > > > > Any feedback is very appreciated.
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > *PROCESS OVERVIEW*
> > > > > > > > > > > 
> > > > > > > > > > > The whole process is described in terms of rebalancing single cache
> > > > > > > > > > > group and partition files would be rebalanced one-by-one:
> > > > > > > > > > > 
> > > > > > > > > > > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > > > > > > > > > > supplier node;
> > > > > > > > > > > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > > > > > > > > > > starts the new checkpoint process;
> > > > > > > > > > > 3. The supplier node creates empty the temporary cache partition
> > > > > > > 
> > > > > > > file
> > > > > > > > > > > with .tmp postfix in the same cache persistence directory;
> > > > > > > > > > > 4. The supplier node splits the whole cache partition file into
> > > > > > > > > > > virtual chunks of predefined size (multiply to the PageMemory size);
> > > > > > > > > > > 4.1. If the concurrent checkpoint thread determines the appropriate
> > > > > > > > > > > cache partition file chunk and tries to flush dirty page to the
> > > > > > > 
> > > > > > > cache
> > > > > > > > > > > partition file
> > > > > > > > > > > 4.1.1. If rebalance chunk already transferred
> > > > > > > > > > > 4.1.1.1. Flush the dirty page to the file;
> > > > > > > > > > > 4.1.2. If rebalance chunk not transferred
> > > > > > > > > > > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > > > > > > > > > 4.1.2.2. Flush the dirty page to the file;
> > > > > > > > > > > 4.2. The node starts sending to the demander node each cache
> > > > > > > 
> > > > > > > partition
> > > > > > > > > > > file chunk one by one using FileChannel#transferTo
> > > > > > > > > > > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > > > > > > > > > > it from the temporary cache partition file;
> > > > > > > > > > > 4.2.2. If the current chunk is not touched – read it from the
> > > > > > > 
> > > > > > > original
> > > > > > > > > > > cache partition file;
> > > > > > > > > > > 5. The demander node starts to listen to new pipe incoming
> > > > > > > 
> > > > > > > connections
> > > > > > > > > > > from the supplier node on TcpCommunicationSpi;
> > > > > > > > > > > 6. The demander node creates the temporary cache partition file with
> > > > > > > > > > > .tmp postfix in the same cache persistence directory;
> > > > > > > > > > > 7. The demander node receives each cache partition file chunk one
> > > > > > > 
> > > > > > > by one
> > > > > > > > > > > 7.1. The node checks CRC for each PageMemory in the downloaded
> > > > > > > 
> > > > > > > chunk;
> > > > > > > > > > > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > > > > > > > > > > partition file position;
> > > > > > > > > > > 8. When the demander node receives the whole cache partition file
> > > > > > > > > > > 8.1. The node initializes received .tmp file as its appropriate
> > > > > > > 
> > > > > > > cache
> > > > > > > > > > > partition file;
> > > > > > > > > > > 8.2. Thread-per-partition begins to apply for data entries from the
> > > > > > > > > > > beginning of WAL-temporary storage;
> > > > > > > > > > > 8.3. All async operations corresponding to this partition file still
> > > > > > > > > > > write to the end of temporary WAL;
> > > > > > > > > > > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > > > > > > > > > 8.4.1. Start the first checkpoint;
> > > > > > > > > > > 8.4.2. Wait for the first checkpoint ends and own the cache
> > > > > > > 
> > > > > > > partition;
> > > > > > > > > > > 8.4.3. All operations now are switched to the partition file instead
> > > > > > > > > > > of writing to the temporary WAL;
> > > > > > > > > > > 8.4.4. Schedule the temporary WAL storage deletion;
> > > > > > > > > > > 9. The supplier node deletes the temporary cache partition file;
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > *COMPONENTS TO CHANGE*
> > > > > > > > > > > 
> > > > > > > > > > > CommunicationSpi
> > > > > > > > > > > 
> > > > > > > > > > > To benefit from zero copy we must delegate the file transferring to
> > > > > > > > > > > FileChannel#transferTo(long, long,
> > > > > > > > > > > java.nio.channels.WritableByteChannel) because the fast path of
> > > > > > > > > > > transferTo method is only executed if the destination buffer
> > > > > > > 
> > > > > > > inherits
> > > > > > > > > > > from an internal JDK class.
> > > > > > > > > > > 
> > > > > > > > > > > Preloader
> > > > > > > > > > > 
> > > > > > > > > > > A new implementation of cache entries preloader assume to be done.
> > > > > > > 
> > > > > > > The
> > > > > > > > > > > new implementation must send and receive cache partition files over
> > > > > > > > > > > the CommunicationSpi channels by chunks of data with validation
> > > > > > > > > > > received items. The new layer over the cache partition file must
> > > > > > > > > > > support direct using of FileChannel#transferTo method over the
> > > > > > > > > > > CommunicationSpi pipe connection. The connection bandwidth of the
> > > > > > > > > > > cache partition file transfer must have the ability to be limited at
> > > > > > > > > > > runtime.
> > > > > > > > > > > 
> > > > > > > > > > > Checkpointer
> > > > > > > > > > > 
> > > > > > > > > > > When the supplier node receives the cache partition file demand
> > > > > > > > > > > request it will send the file over the CommunicationSpi. The cache
> > > > > > > > > > > partition file can be concurrently updated by checkpoint thread
> > > > > > > 
> > > > > > > during
> > > > > > > > > > > its transmission. To guarantee the file consistency Сheckpointer
> > > > > > > 
> > > > > > > must
> > > > > > > > > > > use copy-on-write technique and save a copy of updated chunk into
> > > > > > > 
> > > > > > > the
> > > > > > > > > > > temporary file.
> > > > > > > > > > > 
> > > > > > > > > > > (new) Catch-up temporary WAL
> > > > > > > > > > > 
> > > > > > > > > > > While the demander node is in the partition file transmission state
> > > > > > > 
> > > > > > > it
> > > > > > > > > > > must save all cache entries corresponding to the moving partition
> > > > > > > 
> > > > > > > into
> > > > > > > > > > > a new temporary WAL storage. These entries will be applied later one
> > > > > > > > > > > by one on the received cache partition file. All asynchronous
> > > > > > > > > > > operations will be enrolled to the end of temporary WAL storage
> > > > > > > 
> > > > > > > during
> > > > > > > > > > > storage reads until it becomes fully read. The file-based FIFO
> > > > > > > > > > > approach assumes to be used by this process.
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > *RECOVERY*
> > > > > > > > > > > 
> > > > > > > > > > > In case of crash recovery, there is no additional actions need to be
> > > > > > > > > > > applied to keep the cache partition file consistency. We are not
> > > > > > > > > > > recovering partition with the moving state, thus the single
> > > > > > > 
> > > > > > > partition
> > > > > > > > > > > file will be lost and only it. The uniqueness of it is guaranteed by
> > > > > > > > > > > the single-file-transmission process. The cache partition file will
> > > > > > > 
> > > > > > > be
> > > > > > > > > > > fully loaded on the next rebalance procedure.
> > > > > > > > > > > 
> > > > > > > > > > > To provide default cluster recovery guarantee we must to:
> > > > > > > > > > > 1. Start the checkpoint process when the temporary WAL storage
> > > > > > > 
> > > > > > > becomes
> > > > > > > > > > > empty;
> > > > > > > > > > > 2. Wait for the first checkpoint ends and set owning status to the
> > > > > > > > > > > cache partition;
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > [1]
> > > > > > > > > > > 
> > > > > > > 
> > > > > > > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > > > > > > > > > [2]
> > > > > > > > > > > 
> > > > > > > 
> > > > > > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > > > > > > > > > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > > > > > > > > > [4]
> > > > > > > > > > > 
> > > > > > > 
> > > > > > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > > > > > > > > > 

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Maxim Muzafarov <ma...@gmail.com>.
Nikolay,

In my message above I've described only internal local BackupManager
for the rebalance needs, but for the backup feature of the whole
Ignite cluster I also have some thoughts. I'll give you a detailed
answer in an appropriate discussion topic [1] a bit later.

[1] http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Hot-cache-backup-td41034.html

On Wed, 14 Aug 2019 at 16:40, Nikolay Izhikov <ni...@apache.org> wrote:
>
> Hello, Maxim.
>
> I think backup is a great feature for Ignite.
> Let's have it!
>
> Few notes for it:
>
> 1. Backup directory should be taken from node configuration.
>
> 2. Backup should be stored on local node only.
> Ignite admin can write sh script to move all backuped partitions to one storage by himself.
>
> 3. Ignite should provide CLI tools to start backup/restore procedure.
>
> Questions:
>
> 1. How each backup would be identified?
> 2. Do you plan to implement backup of cache or cache group?
> 3. How restore process would be implemented from user point of view?
>         Can we interact with cache during restore?
>
> В Ср, 14/08/2019 в 16:13 +0300, Maxim Muzafarov пишет:
> > Igniters,
> >
> >
> > Since the file transmission between Ignite nodes [2] have been merged
> > to the master branch (it is the first mandatory part of file-based
> > rebalance procedure) I'd like to focus on the next step of the current
> > IEP-28 - the process of creating snapshots of cache group partitions.
> >
> > Previously, we've discussed a creation of cache group backups [3] for
> > the whole cluster. I'd like to take into account the GridGain
> > experience with such snapshot creation and, at first, focuses on the
> > local internal IgniteBackupManager which will be used for such purpose
> > [4] [1].
> >
> > Changes are almost ready. I need some additional time to finalize the
> > PR with backup creation.
> >
> >
> > API (create local partitions copy)
> >
> > /**
> >  * @param name Unique backup name.
> >  * @param parts Collection of pairs group and appropratate cache
> > partition to be backuped.
> >  * @param dir Local backup directory.
> >  */
> > public IgniteInternalFuture<?> backup(
> >     String name,
> >     Map<Integer, Set<Integer>> parts,
> >     File dir,
> >     ExecutorService backupSvc (this can be completely optional)
> > );
> >
> >
> > API (backup partitoins over the network)
> >
> > /**
> >  * @param name Unique backup name.
> >  * @param parts Collection of pairs group and appropratate cache
> > partition to be backuped.
> >  * @param snd File sender provider.
> >  */
> > public IgniteInternalFuture<?> backup(
> >     String name,
> >     Map<Integer, Set<Integer>> parts,
> >     Supplier<GridIoManager.TransmissionSender> snd
> > );
> >
> > [1] https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing#IEP-28:Clusterpeer-2-peerbalancing-Copypartitiononthefly
> > [2] https://issues.apache.org/jira/browse/IGNITE-10619
> > [3] http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Hot-cache-backup-td41034.html
> > [4] https://issues.apache.org/jira/browse/IGNITE-11073
> >
> > On Wed, 12 Dec 2018 at 11:15, Vladimir Ozerov <vo...@gridgain.com> wrote:
> > >
> > > Maxim,
> > >
> > > Thank you for excellent analysis! From profiling data I see the following:
> > > 1) Almost no parallelism - one rebalance thread is used (default), two responses are sent per a single demand request (default)
> > > 2) All system resources are underutilized - CPU, disk, network
> > > 3) Huge hotspot ion free lists
> > >
> > > In general I would recommend to consider the following points during further rebalance optimization:
> > > 1) Start with the fact that rebalance always causes system degradation due to additional hardware resources required. Different deployments may require different degradation modes. Sometimes "soft" mode is preferable - long rebalance with low system overhead. This is what we see now. Sometimes the opposite - as short rebalance as possible at the cost of severe degradation in operations. Sometimes - something in the middle. Every optimization we made should have clear explanation on how system degrades.
> > > 2) We need to investigate the hotspot on free lists. Looks like this is the main limiting factor for now. Alex, do you have any ideas what is this? Is it possible to bypass freelists completely during rebalance at the cost of higher data fragmentation during concurrent updates?
> > > 3) We need to investigate streaming rebalance mode, when supplier constantly streams data to demander similarly to our data streamer. It should be fairly easy to implement, applicable for all modes and may speedup rebalance up to 5-10 times. Great thing about this approach is that it will allow users to choose between system stress level and rebalance throughput easily.
> > > 4) File transfer rebalance: we need to have clear design of failure and concurrency cases and degradation modes. Several questions to answer:
> > > 4.1) What would happen if another rebalance starts when previous is not finished yet?
> > > 4.2) What would happen if supplier or demander fails in the middle? What kind of cleanup would be required
> > > 4.3) Degradation: what kind of problems should users expect due to massive disk and network load during file transfer and due to data merging on demander side?
> > > 4.4) Degradation: how secondary indexes would be rebuilt on demander side? Note that until indexes are ready node is not operational and cannot become partition owner, and index rebuild is essentially full data rescan with potentially the same issues with slow updates of persistent data structures we have now.
> > >
> > > Vladimir.
> > >
> > > On Fri, Dec 7, 2018 at 3:32 PM Maxim Muzafarov <ma...@gmail.com> wrote:
> > > >
> > > > Vladimir,
> > > >
> > > >
> > > > Let me propose to consider the whole this rebalance process as having
> > > > three strategies:
> > > > - The classical message-based approach, preferable to use for in-memory caches;
> > > > - Historical rebalance based on WAL, used for rebalancing persisted
> > > > caches deltas;
> > > > - (new) File-based rebalance (current IEP-28), used for relocation of
> > > > full cache partitions.
> > > >
> > > >
> > > > First of all, I want to show you that for the full cache relocation
> > > > file-based rebalancing strategy from my point has a set of advantages
> > > > prior to the message-based approach. Let's also assume that the time
> > > > spent on WAL logging during the rebalance procedure is already
> > > > optimized (we are not taking it into account at all).
> > > >
> > > > According to preliminary measurements [8] and the message above we
> > > > spend more than 65% of rebalancing time on creating K-V cache pair for
> > > > preloading entries and supporting internal data structures. It is true
> > > > as for in-memory cluster configuration and for a cluster with enabled
> > > > persistence. It is also true, that these data structures can be used
> > > > more efficiently by implementing batch entry processing for them. And
> > > > it should be done (a ticket for it is already created [3]).
> > > >
> > > > Let's have a look closer to the simple example.
> > > >
> > > > I've collected some information about a cache on my stress-testing cluster:
> > > > partitions (total): 65534
> > > > single partition size: 437 MB
> > > > rebalance batch: 512 Kb
> > > > batches per partition: 874
> > > > partitions per node: 606
> > > > batches per node: 529644
> > > >
> > > > Let's assume that we've already implemented batched entry processing
> > > > and we perform bulk operations over internal data structures.
> > > > Regarding these assumptions, we still need to process 874 batches per
> > > > each cache partition to transfer data. I will cost us up to 15 seconds
> > > > per each partition file, a lot of CPU cycles to maintain internal data
> > > > structures and block for a while other threads waiting for releasing
> > > > database checkpoint lock.
> > > >
> > > > Increasing the rebalance batch size is not efficient here because we
> > > > are starting to hold the database lock for too long. It will lead to
> > > > thread starvation and will only slow down the whole rebalance speed.
> > > > Exactly the same as increasing batch size, making the rebalance thread
> > > > pool bigger can lead to the cluster performance drop for almost the
> > > > same reasons.
> > > >
> > > > I think the file-based rebalance can provide us (prior to the batch
> > > > processing) for huge caches:
> > > >  - a fair non-blocking approach in each part of the rebalancing procedure;
> > > >  - reduce the number of locks being acquired (the other threads can
> > > > make bigger progress);
> > > >  - a zero-copy transmission on supplier saves CPU cycles and memory bandwidth;
> > > >  - as a result, the transferable batch size increased up to the whole
> > > > partition file size;
> > > >
> > > > SUMMARY TO DO
> > > >
> > > > The plan to do and other ideas (without risks evaluation):
> > > >
> > > > Message-based approach.
> > > > Optimization to do by priority [3] [2] and may be [9].
> > > >
> > > > Historical rebalance based on WAL.
> > > > Suppose, nothing to do here as Sergey already working on the issue [1]
> > > > with turning off WAL.
> > > >
> > > > (new) Full cache data relocation.
> > > > Prototyping current IEP-28.
> > > >
> > > > I think another approach can be also implemented.
> > > > During the rebalance procedure we can write entries to data pages
> > > > directly skipping free lists, PK index and secondary index. Once the
> > > > partition preloading is finished, we will rebuild free list and all
> > > > indexes.
> > > > Will it work for us?
> > > >
> > > > ANSWERS
> > > >
> > > > > 1) Is it correct that supplier sends only one message for every demand
> > > > > message? If yes, then streaming should improve network utilization a lot.
> > > >
> > > > I think we already have such ability for the Apache Ignite (not
> > > > exactly streaming). The CacheConfiguration#rebalanceBatchesPrefetchCnt
> > > > can be used here to reduce the system delay between send\receive
> > > > message process. The default value is more than enough for most of the
> > > > cases. The testing results showed only 7 seconds (0.32%) delay during
> > > > the 40 min cache rebalance procedure. So, each supply message is ready
> > > > to be sent when the next demand message arrives.
> > > >
> > > >
> > > > > 2) Is it correct that for user caches we process supply messages in a
> > > > > system pool? Did we consider moving it to striped pool? Because if all
> > > > > operations on a single partition is ordered, we may apply a number of
> > > > > critical optimizations - bypassing page cache and checkpointer for new
> > > > > entries, batched index updates, batched free list updates, etc.
> > > >
> > > > I think the rebalance procedure should not cause a thousand messages
> > > > per second, so we don't need to move the rebalance procedure to the
> > > > stripped pool. We should have a limited stable load for rebalancing
> > > > procedure on the cluster. As for the second part, are you talking
> > > > about thread per partition model? If yes, we have tickets for it [4],
> > > > [5], [6].
> > > >
> > > > > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> > > > > conditions when it could be disabled on supplier side?
> > > >
> > > > Do you mean the demander side? Why we should try to disable it on the
> > > > supplier node? I do not take it into account at all because it can be
> > > > easily done (suppose issue [1] is about it). But it doesn't help us
> > > > much for the full cache relocations.
> > > >
> > > > > 4) Most important - have we tried to profile plain single-threaded
> > > > > rebalance without concurrent write load? We need to have clear
> > > > > understanding on where time is spent - supplier/demander, cpu/network/disk,
> > > > > etc. Some Java tracing code should help.
> > > >
> > > > I've updated some information about profiling results on the
> > > > confluence page [8]. If you will find that I've missed something or
> > > > information is unclear, please, let me know and I will fix it.
> > > >
> > > > > And one question regarding proposed implementation - how are we going to
> > > > > handle secondary indexes?
> > > >
> > > > Thank you for pointing this out. Actually, the current IEP page
> > > > doesn't cover this case. I think we can schedule rebuild indexes after
> > > > all partition files would be transferred. This approach was also
> > > > mentioned at [2] issue.
> > > > Will it be the correct?
> > > >
> > > >
> > > > [1] https://issues.apache.org/jira/browse/IGNITE-10505
> > > > [2] https://issues.apache.org/jira/browse/IGNITE-7934
> > > > [3] https://issues.apache.org/jira/browse/IGNITE-7935
> > > >
> > > > [4] https://issues.apache.org/jira/browse/IGNITE-4682
> > > > [5] https://issues.apache.org/jira/browse/IGNITE-4506
> > > > [6] https://issues.apache.org/jira/browse/IGNITE-4680
> > > >
> > > > [7] https://issues.apache.org/jira/browse/IGNITE-7027
> > > > [8] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
> > > > [9] https://issues.apache.org/jira/browse/IGNITE-9520
> > > > On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com> wrote:
> > > > >
> > > > > Maxim,
> > > > >
> > > > > Regarding MVCC - this is essentially a copy-on-write approach. New entry is
> > > > > created on every update. They are cleaned asynchronously by dedicated
> > > > > threads (aka "vacuum").
> > > > >
> > > > > I looked at the document you mentioned, thank you for pointing to it. But
> > > > > it doesn't answer all questions around existing design, and what I am
> > > > > trying to do is to get how deep do we understand current problems. It is
> > > > > very true that various subsystems, such as buffer managers, WALs,
> > > > > supporting sctructures, etc. incur very serious overhead. And when it comes
> > > > > to heavy operations implementors typically seek for a way to bypass as much
> > > > > components as possible, taking in count that different shortcuts lead to
> > > > > different types of side effects. And IMO our very important goal for now is
> > > > > to create space of possible improvements, and estimate their costs, risks
> > > > > and applicability for product's configuration space.
> > > > >
> > > > > Let me claridy several questions about current rebalance implementation, as
> > > > > I am not a big expert here.
> > > > > 1) Is it correct that supplier sends only one message for every demand
> > > > > message? If yes, then streaming should improve network utilization a lot.
> > > > > 2) Is it correct that for user caches we process supply messages in a
> > > > > system pool? Did we consider moving it to striped pool? Because if all
> > > > > operations on a single partition is ordered, we may apply a number of
> > > > > critical optimizations - bypassing page cache and checkpointer for new
> > > > > entries, batched index updates, batched free list updates, etc.
> > > > > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> > > > > conditions when it could be disabled on supplier side?
> > > > > 4) Most important - have we tried to profile plain single-threaded
> > > > > rebalance without concurrent write load? We need to have clear
> > > > > understanding on where time is spent - supplier/demander, cpu/network/disk,
> > > > > etc. Some Java tracing code should help.
> > > > >
> > > > > And one question regarding proposed implementation - how are we going to
> > > > > handle secondary indexes?
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/IGNITE-8017
> > > > >
> > > > >
> > > > > On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:
> > > > >
> > > > > > Eduard,
> > > > > >
> > > > > > Thank you very much for the discussion!
> > > > > >
> > > > > > Your algorithm looks much better for me too and easier to implement.
> > > > > > I'll update appropriate process points on IEP page of the proposed
> > > > > > rebalance procedure.
> > > > > > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > > > > > <ed...@gmail.com> wrote:
> > > > > > >
> > > > > > > So, after some discussion, I could describe another approach on how to
> > > > > > > build consistent partition on the fly.
> > > > > > >
> > > > > > > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > > > > > > 2. After checkpoint finish, we start sending partition file (without any
> > > > > > > lock) to the receiver from 0 to fixed size.
> > > > > > > 3. Next checkpoints if they detect that they would override some pages of
> > > > > > > transferring file should write the previous state of a page to a
> > > > > >
> > > > > > dedicated
> > > > > > > file.
> > > > > > > So, we would have a list of pages written 1 by 1, page id is written in
> > > > > >
> > > > > > the
> > > > > > > page itself so we could determine page index. Let's name it log.
> > > > > > > 4. When transfer finished checkpointer would stop updating log-file. Now
> > > > > >
> > > > > > we
> > > > > > > are ready to send it to the receiver.
> > > > > > > 5. On receiver side we start merging the dirty partition file with log
> > > > > > > (updating it with pages from log-file).
> > > > > > >
> > > > > > > So, an advantage of this method:
> > > > > > > - checkpoint-thread work couldn't  increase more than twice;
> > > > > > > - checkpoint-threads shouldn't wait for anything;
> > > > > > > - in best case, we receive partition without any extra effort.
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > > > > > eduard.shangareev@gmail.com> wrote:
> > > > > > >
> > > > > > > > Maxim,
> > > > > > > >
> > > > > > > > I have looked through your algorithm of reading partition consistently.
> > > > > > > > And I have some questions/comments.
> > > > > > > >
> > > > > > > > 1. The algorithm requires heavy synchronization between
> > > > > >
> > > > > > checkpoint-thread
> > > > > > > > and new-approach-rebalance-threads,
> > > > > > > > because you need strong guarantees to not start writing or reading to
> > > > > > > > chunk which was updated or started reading by the counterpart.
> > > > > > > >
> > > > > > > > 2. Also, if we have started transferring this chunk in original
> > > > > >
> > > > > > partition
> > > > > > > > couldn't be updated by checkpoint-threads. They should wait for
> > > > > >
> > > > > > transfer
> > > > > > > > finishing.
> > > > > > > >
> > > > > > > > 3. If sending is slow and partition is updated then in worst case
> > > > > > > > checkpoint-threads would create the whole copy of the partition.
> > > > > > > >
> > > > > > > > So, what we have:
> > > > > > > > -on every page write checkpoint-thread should synchronize with
> > > > > > > > new-approach-rebalance-threads;
> > > > > > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > > > > >
> > > > > > huge
> > > > > > > > as copying the whole partition.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > > > > >
> > > > > > ilya.kasnacheev@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello!
> > > > > > > > >
> > > > > > > > > This proposal will also happily break my compression-with-dictionary
> > > > > >
> > > > > > patch
> > > > > > > > > since it relies currently on only having local dictionaries.
> > > > > > > > >
> > > > > > > > > However, when you have compressed data, maybe speed boost is even
> > > > > >
> > > > > > greater
> > > > > > > > > with your approach.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > --
> > > > > > > > > Ilya Kasnacheev
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > > > > > > > >
> > > > > > > > > > Igniters,
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I'd like to take the next step of increasing the Apache Ignite with
> > > > > > > > > > enabled persistence rebalance speed. Currently, the rebalancing
> > > > > > > > > > procedure doesn't utilize the network and storage device throughout
> > > > > >
> > > > > > to
> > > > > > > > > > its full extent even with enough meaningful values of
> > > > > > > > > > rebalanceThreadPoolSize property. As part of the previous discussion
> > > > > > > > > > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > > > > > > > > > idea [3] of transferring cache partition files over the network.
> > > > > > > > > > From my point, the case to which this type of rebalancing procedure
> > > > > > > > > > can bring the most benefit – is adding a completely new node or set
> > > > > >
> > > > > > of
> > > > > > > > > > new nodes to the cluster. Such a scenario implies fully relocation
> > > > > >
> > > > > > of
> > > > > > > > > > cache partition files to the new node. To roughly estimate the
> > > > > > > > > > superiority of partition file transmitting over the network the
> > > > > >
> > > > > > native
> > > > > > > > > > Linux scp\rsync commands can be used. My test environment showed the
> > > > > > > > > > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > > > > > > > > single-threaded rebalance speed.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I've prepared the design document IEP-28 [4] and accumulated all the
> > > > > > > > > > process details of a new rebalance approach on that page. Below you
> > > > > > > > > > can find the most significant details of the new rebalance procedure
> > > > > > > > > > and components of the Apache Ignite which are proposed to change.
> > > > > > > > > >
> > > > > > > > > > Any feedback is very appreciated.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *PROCESS OVERVIEW*
> > > > > > > > > >
> > > > > > > > > > The whole process is described in terms of rebalancing single cache
> > > > > > > > > > group and partition files would be rebalanced one-by-one:
> > > > > > > > > >
> > > > > > > > > > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > > > > > > > > > supplier node;
> > > > > > > > > > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > > > > > > > > > starts the new checkpoint process;
> > > > > > > > > > 3. The supplier node creates empty the temporary cache partition
> > > > > >
> > > > > > file
> > > > > > > > > > with .tmp postfix in the same cache persistence directory;
> > > > > > > > > > 4. The supplier node splits the whole cache partition file into
> > > > > > > > > > virtual chunks of predefined size (multiply to the PageMemory size);
> > > > > > > > > > 4.1. If the concurrent checkpoint thread determines the appropriate
> > > > > > > > > > cache partition file chunk and tries to flush dirty page to the
> > > > > >
> > > > > > cache
> > > > > > > > > > partition file
> > > > > > > > > > 4.1.1. If rebalance chunk already transferred
> > > > > > > > > > 4.1.1.1. Flush the dirty page to the file;
> > > > > > > > > > 4.1.2. If rebalance chunk not transferred
> > > > > > > > > > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > > > > > > > > 4.1.2.2. Flush the dirty page to the file;
> > > > > > > > > > 4.2. The node starts sending to the demander node each cache
> > > > > >
> > > > > > partition
> > > > > > > > > > file chunk one by one using FileChannel#transferTo
> > > > > > > > > > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > > > > > > > > > it from the temporary cache partition file;
> > > > > > > > > > 4.2.2. If the current chunk is not touched – read it from the
> > > > > >
> > > > > > original
> > > > > > > > > > cache partition file;
> > > > > > > > > > 5. The demander node starts to listen to new pipe incoming
> > > > > >
> > > > > > connections
> > > > > > > > > > from the supplier node on TcpCommunicationSpi;
> > > > > > > > > > 6. The demander node creates the temporary cache partition file with
> > > > > > > > > > .tmp postfix in the same cache persistence directory;
> > > > > > > > > > 7. The demander node receives each cache partition file chunk one
> > > > > >
> > > > > > by one
> > > > > > > > > > 7.1. The node checks CRC for each PageMemory in the downloaded
> > > > > >
> > > > > > chunk;
> > > > > > > > > > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > > > > > > > > > partition file position;
> > > > > > > > > > 8. When the demander node receives the whole cache partition file
> > > > > > > > > > 8.1. The node initializes received .tmp file as its appropriate
> > > > > >
> > > > > > cache
> > > > > > > > > > partition file;
> > > > > > > > > > 8.2. Thread-per-partition begins to apply for data entries from the
> > > > > > > > > > beginning of WAL-temporary storage;
> > > > > > > > > > 8.3. All async operations corresponding to this partition file still
> > > > > > > > > > write to the end of temporary WAL;
> > > > > > > > > > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > > > > > > > > 8.4.1. Start the first checkpoint;
> > > > > > > > > > 8.4.2. Wait for the first checkpoint ends and own the cache
> > > > > >
> > > > > > partition;
> > > > > > > > > > 8.4.3. All operations now are switched to the partition file instead
> > > > > > > > > > of writing to the temporary WAL;
> > > > > > > > > > 8.4.4. Schedule the temporary WAL storage deletion;
> > > > > > > > > > 9. The supplier node deletes the temporary cache partition file;
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *COMPONENTS TO CHANGE*
> > > > > > > > > >
> > > > > > > > > > CommunicationSpi
> > > > > > > > > >
> > > > > > > > > > To benefit from zero copy we must delegate the file transferring to
> > > > > > > > > > FileChannel#transferTo(long, long,
> > > > > > > > > > java.nio.channels.WritableByteChannel) because the fast path of
> > > > > > > > > > transferTo method is only executed if the destination buffer
> > > > > >
> > > > > > inherits
> > > > > > > > > > from an internal JDK class.
> > > > > > > > > >
> > > > > > > > > > Preloader
> > > > > > > > > >
> > > > > > > > > > A new implementation of cache entries preloader assume to be done.
> > > > > >
> > > > > > The
> > > > > > > > > > new implementation must send and receive cache partition files over
> > > > > > > > > > the CommunicationSpi channels by chunks of data with validation
> > > > > > > > > > received items. The new layer over the cache partition file must
> > > > > > > > > > support direct using of FileChannel#transferTo method over the
> > > > > > > > > > CommunicationSpi pipe connection. The connection bandwidth of the
> > > > > > > > > > cache partition file transfer must have the ability to be limited at
> > > > > > > > > > runtime.
> > > > > > > > > >
> > > > > > > > > > Checkpointer
> > > > > > > > > >
> > > > > > > > > > When the supplier node receives the cache partition file demand
> > > > > > > > > > request it will send the file over the CommunicationSpi. The cache
> > > > > > > > > > partition file can be concurrently updated by checkpoint thread
> > > > > >
> > > > > > during
> > > > > > > > > > its transmission. To guarantee the file consistency Сheckpointer
> > > > > >
> > > > > > must
> > > > > > > > > > use copy-on-write technique and save a copy of updated chunk into
> > > > > >
> > > > > > the
> > > > > > > > > > temporary file.
> > > > > > > > > >
> > > > > > > > > > (new) Catch-up temporary WAL
> > > > > > > > > >
> > > > > > > > > > While the demander node is in the partition file transmission state
> > > > > >
> > > > > > it
> > > > > > > > > > must save all cache entries corresponding to the moving partition
> > > > > >
> > > > > > into
> > > > > > > > > > a new temporary WAL storage. These entries will be applied later one
> > > > > > > > > > by one on the received cache partition file. All asynchronous
> > > > > > > > > > operations will be enrolled to the end of temporary WAL storage
> > > > > >
> > > > > > during
> > > > > > > > > > storage reads until it becomes fully read. The file-based FIFO
> > > > > > > > > > approach assumes to be used by this process.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *RECOVERY*
> > > > > > > > > >
> > > > > > > > > > In case of crash recovery, there is no additional actions need to be
> > > > > > > > > > applied to keep the cache partition file consistency. We are not
> > > > > > > > > > recovering partition with the moving state, thus the single
> > > > > >
> > > > > > partition
> > > > > > > > > > file will be lost and only it. The uniqueness of it is guaranteed by
> > > > > > > > > > the single-file-transmission process. The cache partition file will
> > > > > >
> > > > > > be
> > > > > > > > > > fully loaded on the next rebalance procedure.
> > > > > > > > > >
> > > > > > > > > > To provide default cluster recovery guarantee we must to:
> > > > > > > > > > 1. Start the checkpoint process when the temporary WAL storage
> > > > > >
> > > > > > becomes
> > > > > > > > > > empty;
> > > > > > > > > > 2. Wait for the first checkpoint ends and set owning status to the
> > > > > > > > > > cache partition;
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > >
> > > > > > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > > > > > > > > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > > > > > > > > [4]
> > > > > > > > > >
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > > > > > > > >

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Nikolay Izhikov <ni...@apache.org>.
Hello, Maxim.

I think backup is a great feature for Ignite.
Let's have it!

Few notes for it:

1. Backup directory should be taken from node configuration.

2. Backup should be stored on local node only.
Ignite admin can write sh script to move all backuped partitions to one storage by himself.

3. Ignite should provide CLI tools to start backup/restore procedure.

Questions:

1. How each backup would be identified?
2. Do you plan to implement backup of cache or cache group?
3. How restore process would be implemented from user point of view? 
	Can we interact with cache during restore?

В Ср, 14/08/2019 в 16:13 +0300, Maxim Muzafarov пишет:
> Igniters,
> 
> 
> Since the file transmission between Ignite nodes [2] have been merged
> to the master branch (it is the first mandatory part of file-based
> rebalance procedure) I'd like to focus on the next step of the current
> IEP-28 - the process of creating snapshots of cache group partitions.
> 
> Previously, we've discussed a creation of cache group backups [3] for
> the whole cluster. I'd like to take into account the GridGain
> experience with such snapshot creation and, at first, focuses on the
> local internal IgniteBackupManager which will be used for such purpose
> [4] [1].
> 
> Changes are almost ready. I need some additional time to finalize the
> PR with backup creation.
> 
> 
> API (create local partitions copy)
> 
> /**
>  * @param name Unique backup name.
>  * @param parts Collection of pairs group and appropratate cache
> partition to be backuped.
>  * @param dir Local backup directory.
>  */
> public IgniteInternalFuture<?> backup(
>     String name,
>     Map<Integer, Set<Integer>> parts,
>     File dir,
>     ExecutorService backupSvc (this can be completely optional)
> );
> 
> 
> API (backup partitoins over the network)
> 
> /**
>  * @param name Unique backup name.
>  * @param parts Collection of pairs group and appropratate cache
> partition to be backuped.
>  * @param snd File sender provider.
>  */
> public IgniteInternalFuture<?> backup(
>     String name,
>     Map<Integer, Set<Integer>> parts,
>     Supplier<GridIoManager.TransmissionSender> snd
> );
> 
> [1] https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing#IEP-28:Clusterpeer-2-peerbalancing-Copypartitiononthefly
> [2] https://issues.apache.org/jira/browse/IGNITE-10619
> [3] http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Hot-cache-backup-td41034.html
> [4] https://issues.apache.org/jira/browse/IGNITE-11073
> 
> On Wed, 12 Dec 2018 at 11:15, Vladimir Ozerov <vo...@gridgain.com> wrote:
> > 
> > Maxim,
> > 
> > Thank you for excellent analysis! From profiling data I see the following:
> > 1) Almost no parallelism - one rebalance thread is used (default), two responses are sent per a single demand request (default)
> > 2) All system resources are underutilized - CPU, disk, network
> > 3) Huge hotspot ion free lists
> > 
> > In general I would recommend to consider the following points during further rebalance optimization:
> > 1) Start with the fact that rebalance always causes system degradation due to additional hardware resources required. Different deployments may require different degradation modes. Sometimes "soft" mode is preferable - long rebalance with low system overhead. This is what we see now. Sometimes the opposite - as short rebalance as possible at the cost of severe degradation in operations. Sometimes - something in the middle. Every optimization we made should have clear explanation on how system degrades.
> > 2) We need to investigate the hotspot on free lists. Looks like this is the main limiting factor for now. Alex, do you have any ideas what is this? Is it possible to bypass freelists completely during rebalance at the cost of higher data fragmentation during concurrent updates?
> > 3) We need to investigate streaming rebalance mode, when supplier constantly streams data to demander similarly to our data streamer. It should be fairly easy to implement, applicable for all modes and may speedup rebalance up to 5-10 times. Great thing about this approach is that it will allow users to choose between system stress level and rebalance throughput easily.
> > 4) File transfer rebalance: we need to have clear design of failure and concurrency cases and degradation modes. Several questions to answer:
> > 4.1) What would happen if another rebalance starts when previous is not finished yet?
> > 4.2) What would happen if supplier or demander fails in the middle? What kind of cleanup would be required
> > 4.3) Degradation: what kind of problems should users expect due to massive disk and network load during file transfer and due to data merging on demander side?
> > 4.4) Degradation: how secondary indexes would be rebuilt on demander side? Note that until indexes are ready node is not operational and cannot become partition owner, and index rebuild is essentially full data rescan with potentially the same issues with slow updates of persistent data structures we have now.
> > 
> > Vladimir.
> > 
> > On Fri, Dec 7, 2018 at 3:32 PM Maxim Muzafarov <ma...@gmail.com> wrote:
> > > 
> > > Vladimir,
> > > 
> > > 
> > > Let me propose to consider the whole this rebalance process as having
> > > three strategies:
> > > - The classical message-based approach, preferable to use for in-memory caches;
> > > - Historical rebalance based on WAL, used for rebalancing persisted
> > > caches deltas;
> > > - (new) File-based rebalance (current IEP-28), used for relocation of
> > > full cache partitions.
> > > 
> > > 
> > > First of all, I want to show you that for the full cache relocation
> > > file-based rebalancing strategy from my point has a set of advantages
> > > prior to the message-based approach. Let's also assume that the time
> > > spent on WAL logging during the rebalance procedure is already
> > > optimized (we are not taking it into account at all).
> > > 
> > > According to preliminary measurements [8] and the message above we
> > > spend more than 65% of rebalancing time on creating K-V cache pair for
> > > preloading entries and supporting internal data structures. It is true
> > > as for in-memory cluster configuration and for a cluster with enabled
> > > persistence. It is also true, that these data structures can be used
> > > more efficiently by implementing batch entry processing for them. And
> > > it should be done (a ticket for it is already created [3]).
> > > 
> > > Let's have a look closer to the simple example.
> > > 
> > > I've collected some information about a cache on my stress-testing cluster:
> > > partitions (total): 65534
> > > single partition size: 437 MB
> > > rebalance batch: 512 Kb
> > > batches per partition: 874
> > > partitions per node: 606
> > > batches per node: 529644
> > > 
> > > Let's assume that we've already implemented batched entry processing
> > > and we perform bulk operations over internal data structures.
> > > Regarding these assumptions, we still need to process 874 batches per
> > > each cache partition to transfer data. I will cost us up to 15 seconds
> > > per each partition file, a lot of CPU cycles to maintain internal data
> > > structures and block for a while other threads waiting for releasing
> > > database checkpoint lock.
> > > 
> > > Increasing the rebalance batch size is not efficient here because we
> > > are starting to hold the database lock for too long. It will lead to
> > > thread starvation and will only slow down the whole rebalance speed.
> > > Exactly the same as increasing batch size, making the rebalance thread
> > > pool bigger can lead to the cluster performance drop for almost the
> > > same reasons.
> > > 
> > > I think the file-based rebalance can provide us (prior to the batch
> > > processing) for huge caches:
> > >  - a fair non-blocking approach in each part of the rebalancing procedure;
> > >  - reduce the number of locks being acquired (the other threads can
> > > make bigger progress);
> > >  - a zero-copy transmission on supplier saves CPU cycles and memory bandwidth;
> > >  - as a result, the transferable batch size increased up to the whole
> > > partition file size;
> > > 
> > > SUMMARY TO DO
> > > 
> > > The plan to do and other ideas (without risks evaluation):
> > > 
> > > Message-based approach.
> > > Optimization to do by priority [3] [2] and may be [9].
> > > 
> > > Historical rebalance based on WAL.
> > > Suppose, nothing to do here as Sergey already working on the issue [1]
> > > with turning off WAL.
> > > 
> > > (new) Full cache data relocation.
> > > Prototyping current IEP-28.
> > > 
> > > I think another approach can be also implemented.
> > > During the rebalance procedure we can write entries to data pages
> > > directly skipping free lists, PK index and secondary index. Once the
> > > partition preloading is finished, we will rebuild free list and all
> > > indexes.
> > > Will it work for us?
> > > 
> > > ANSWERS
> > > 
> > > > 1) Is it correct that supplier sends only one message for every demand
> > > > message? If yes, then streaming should improve network utilization a lot.
> > > 
> > > I think we already have such ability for the Apache Ignite (not
> > > exactly streaming). The CacheConfiguration#rebalanceBatchesPrefetchCnt
> > > can be used here to reduce the system delay between send\receive
> > > message process. The default value is more than enough for most of the
> > > cases. The testing results showed only 7 seconds (0.32%) delay during
> > > the 40 min cache rebalance procedure. So, each supply message is ready
> > > to be sent when the next demand message arrives.
> > > 
> > > 
> > > > 2) Is it correct that for user caches we process supply messages in a
> > > > system pool? Did we consider moving it to striped pool? Because if all
> > > > operations on a single partition is ordered, we may apply a number of
> > > > critical optimizations - bypassing page cache and checkpointer for new
> > > > entries, batched index updates, batched free list updates, etc.
> > > 
> > > I think the rebalance procedure should not cause a thousand messages
> > > per second, so we don't need to move the rebalance procedure to the
> > > stripped pool. We should have a limited stable load for rebalancing
> > > procedure on the cluster. As for the second part, are you talking
> > > about thread per partition model? If yes, we have tickets for it [4],
> > > [5], [6].
> > > 
> > > > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> > > > conditions when it could be disabled on supplier side?
> > > 
> > > Do you mean the demander side? Why we should try to disable it on the
> > > supplier node? I do not take it into account at all because it can be
> > > easily done (suppose issue [1] is about it). But it doesn't help us
> > > much for the full cache relocations.
> > > 
> > > > 4) Most important - have we tried to profile plain single-threaded
> > > > rebalance without concurrent write load? We need to have clear
> > > > understanding on where time is spent - supplier/demander, cpu/network/disk,
> > > > etc. Some Java tracing code should help.
> > > 
> > > I've updated some information about profiling results on the
> > > confluence page [8]. If you will find that I've missed something or
> > > information is unclear, please, let me know and I will fix it.
> > > 
> > > > And one question regarding proposed implementation - how are we going to
> > > > handle secondary indexes?
> > > 
> > > Thank you for pointing this out. Actually, the current IEP page
> > > doesn't cover this case. I think we can schedule rebuild indexes after
> > > all partition files would be transferred. This approach was also
> > > mentioned at [2] issue.
> > > Will it be the correct?
> > > 
> > > 
> > > [1] https://issues.apache.org/jira/browse/IGNITE-10505
> > > [2] https://issues.apache.org/jira/browse/IGNITE-7934
> > > [3] https://issues.apache.org/jira/browse/IGNITE-7935
> > > 
> > > [4] https://issues.apache.org/jira/browse/IGNITE-4682
> > > [5] https://issues.apache.org/jira/browse/IGNITE-4506
> > > [6] https://issues.apache.org/jira/browse/IGNITE-4680
> > > 
> > > [7] https://issues.apache.org/jira/browse/IGNITE-7027
> > > [8] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
> > > [9] https://issues.apache.org/jira/browse/IGNITE-9520
> > > On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com> wrote:
> > > > 
> > > > Maxim,
> > > > 
> > > > Regarding MVCC - this is essentially a copy-on-write approach. New entry is
> > > > created on every update. They are cleaned asynchronously by dedicated
> > > > threads (aka "vacuum").
> > > > 
> > > > I looked at the document you mentioned, thank you for pointing to it. But
> > > > it doesn't answer all questions around existing design, and what I am
> > > > trying to do is to get how deep do we understand current problems. It is
> > > > very true that various subsystems, such as buffer managers, WALs,
> > > > supporting sctructures, etc. incur very serious overhead. And when it comes
> > > > to heavy operations implementors typically seek for a way to bypass as much
> > > > components as possible, taking in count that different shortcuts lead to
> > > > different types of side effects. And IMO our very important goal for now is
> > > > to create space of possible improvements, and estimate their costs, risks
> > > > and applicability for product's configuration space.
> > > > 
> > > > Let me claridy several questions about current rebalance implementation, as
> > > > I am not a big expert here.
> > > > 1) Is it correct that supplier sends only one message for every demand
> > > > message? If yes, then streaming should improve network utilization a lot.
> > > > 2) Is it correct that for user caches we process supply messages in a
> > > > system pool? Did we consider moving it to striped pool? Because if all
> > > > operations on a single partition is ordered, we may apply a number of
> > > > critical optimizations - bypassing page cache and checkpointer for new
> > > > entries, batched index updates, batched free list updates, etc.
> > > > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> > > > conditions when it could be disabled on supplier side?
> > > > 4) Most important - have we tried to profile plain single-threaded
> > > > rebalance without concurrent write load? We need to have clear
> > > > understanding on where time is spent - supplier/demander, cpu/network/disk,
> > > > etc. Some Java tracing code should help.
> > > > 
> > > > And one question regarding proposed implementation - how are we going to
> > > > handle secondary indexes?
> > > > 
> > > > [1] https://issues.apache.org/jira/browse/IGNITE-8017
> > > > 
> > > > 
> > > > On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:
> > > > 
> > > > > Eduard,
> > > > > 
> > > > > Thank you very much for the discussion!
> > > > > 
> > > > > Your algorithm looks much better for me too and easier to implement.
> > > > > I'll update appropriate process points on IEP page of the proposed
> > > > > rebalance procedure.
> > > > > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > > > > <ed...@gmail.com> wrote:
> > > > > > 
> > > > > > So, after some discussion, I could describe another approach on how to
> > > > > > build consistent partition on the fly.
> > > > > > 
> > > > > > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > > > > > 2. After checkpoint finish, we start sending partition file (without any
> > > > > > lock) to the receiver from 0 to fixed size.
> > > > > > 3. Next checkpoints if they detect that they would override some pages of
> > > > > > transferring file should write the previous state of a page to a
> > > > > 
> > > > > dedicated
> > > > > > file.
> > > > > > So, we would have a list of pages written 1 by 1, page id is written in
> > > > > 
> > > > > the
> > > > > > page itself so we could determine page index. Let's name it log.
> > > > > > 4. When transfer finished checkpointer would stop updating log-file. Now
> > > > > 
> > > > > we
> > > > > > are ready to send it to the receiver.
> > > > > > 5. On receiver side we start merging the dirty partition file with log
> > > > > > (updating it with pages from log-file).
> > > > > > 
> > > > > > So, an advantage of this method:
> > > > > > - checkpoint-thread work couldn't  increase more than twice;
> > > > > > - checkpoint-threads shouldn't wait for anything;
> > > > > > - in best case, we receive partition without any extra effort.
> > > > > > 
> > > > > > 
> > > > > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > > > > eduard.shangareev@gmail.com> wrote:
> > > > > > 
> > > > > > > Maxim,
> > > > > > > 
> > > > > > > I have looked through your algorithm of reading partition consistently.
> > > > > > > And I have some questions/comments.
> > > > > > > 
> > > > > > > 1. The algorithm requires heavy synchronization between
> > > > > 
> > > > > checkpoint-thread
> > > > > > > and new-approach-rebalance-threads,
> > > > > > > because you need strong guarantees to not start writing or reading to
> > > > > > > chunk which was updated or started reading by the counterpart.
> > > > > > > 
> > > > > > > 2. Also, if we have started transferring this chunk in original
> > > > > 
> > > > > partition
> > > > > > > couldn't be updated by checkpoint-threads. They should wait for
> > > > > 
> > > > > transfer
> > > > > > > finishing.
> > > > > > > 
> > > > > > > 3. If sending is slow and partition is updated then in worst case
> > > > > > > checkpoint-threads would create the whole copy of the partition.
> > > > > > > 
> > > > > > > So, what we have:
> > > > > > > -on every page write checkpoint-thread should synchronize with
> > > > > > > new-approach-rebalance-threads;
> > > > > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > > > > 
> > > > > huge
> > > > > > > as copying the whole partition.
> > > > > > > 
> > > > > > > 
> > > > > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > > > > 
> > > > > ilya.kasnacheev@gmail.com>
> > > > > > > wrote:
> > > > > > > 
> > > > > > > > Hello!
> > > > > > > > 
> > > > > > > > This proposal will also happily break my compression-with-dictionary
> > > > > 
> > > > > patch
> > > > > > > > since it relies currently on only having local dictionaries.
> > > > > > > > 
> > > > > > > > However, when you have compressed data, maybe speed boost is even
> > > > > 
> > > > > greater
> > > > > > > > with your approach.
> > > > > > > > 
> > > > > > > > Regards,
> > > > > > > > --
> > > > > > > > Ilya Kasnacheev
> > > > > > > > 
> > > > > > > > 
> > > > > > > > пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > > > > > > > 
> > > > > > > > > Igniters,
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > I'd like to take the next step of increasing the Apache Ignite with
> > > > > > > > > enabled persistence rebalance speed. Currently, the rebalancing
> > > > > > > > > procedure doesn't utilize the network and storage device throughout
> > > > > 
> > > > > to
> > > > > > > > > its full extent even with enough meaningful values of
> > > > > > > > > rebalanceThreadPoolSize property. As part of the previous discussion
> > > > > > > > > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > > > > > > > > idea [3] of transferring cache partition files over the network.
> > > > > > > > > From my point, the case to which this type of rebalancing procedure
> > > > > > > > > can bring the most benefit – is adding a completely new node or set
> > > > > 
> > > > > of
> > > > > > > > > new nodes to the cluster. Such a scenario implies fully relocation
> > > > > 
> > > > > of
> > > > > > > > > cache partition files to the new node. To roughly estimate the
> > > > > > > > > superiority of partition file transmitting over the network the
> > > > > 
> > > > > native
> > > > > > > > > Linux scp\rsync commands can be used. My test environment showed the
> > > > > > > > > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > > > > > > > single-threaded rebalance speed.
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > I've prepared the design document IEP-28 [4] and accumulated all the
> > > > > > > > > process details of a new rebalance approach on that page. Below you
> > > > > > > > > can find the most significant details of the new rebalance procedure
> > > > > > > > > and components of the Apache Ignite which are proposed to change.
> > > > > > > > > 
> > > > > > > > > Any feedback is very appreciated.
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > *PROCESS OVERVIEW*
> > > > > > > > > 
> > > > > > > > > The whole process is described in terms of rebalancing single cache
> > > > > > > > > group and partition files would be rebalanced one-by-one:
> > > > > > > > > 
> > > > > > > > > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > > > > > > > > supplier node;
> > > > > > > > > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > > > > > > > > starts the new checkpoint process;
> > > > > > > > > 3. The supplier node creates empty the temporary cache partition
> > > > > 
> > > > > file
> > > > > > > > > with .tmp postfix in the same cache persistence directory;
> > > > > > > > > 4. The supplier node splits the whole cache partition file into
> > > > > > > > > virtual chunks of predefined size (multiply to the PageMemory size);
> > > > > > > > > 4.1. If the concurrent checkpoint thread determines the appropriate
> > > > > > > > > cache partition file chunk and tries to flush dirty page to the
> > > > > 
> > > > > cache
> > > > > > > > > partition file
> > > > > > > > > 4.1.1. If rebalance chunk already transferred
> > > > > > > > > 4.1.1.1. Flush the dirty page to the file;
> > > > > > > > > 4.1.2. If rebalance chunk not transferred
> > > > > > > > > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > > > > > > > 4.1.2.2. Flush the dirty page to the file;
> > > > > > > > > 4.2. The node starts sending to the demander node each cache
> > > > > 
> > > > > partition
> > > > > > > > > file chunk one by one using FileChannel#transferTo
> > > > > > > > > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > > > > > > > > it from the temporary cache partition file;
> > > > > > > > > 4.2.2. If the current chunk is not touched – read it from the
> > > > > 
> > > > > original
> > > > > > > > > cache partition file;
> > > > > > > > > 5. The demander node starts to listen to new pipe incoming
> > > > > 
> > > > > connections
> > > > > > > > > from the supplier node on TcpCommunicationSpi;
> > > > > > > > > 6. The demander node creates the temporary cache partition file with
> > > > > > > > > .tmp postfix in the same cache persistence directory;
> > > > > > > > > 7. The demander node receives each cache partition file chunk one
> > > > > 
> > > > > by one
> > > > > > > > > 7.1. The node checks CRC for each PageMemory in the downloaded
> > > > > 
> > > > > chunk;
> > > > > > > > > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > > > > > > > > partition file position;
> > > > > > > > > 8. When the demander node receives the whole cache partition file
> > > > > > > > > 8.1. The node initializes received .tmp file as its appropriate
> > > > > 
> > > > > cache
> > > > > > > > > partition file;
> > > > > > > > > 8.2. Thread-per-partition begins to apply for data entries from the
> > > > > > > > > beginning of WAL-temporary storage;
> > > > > > > > > 8.3. All async operations corresponding to this partition file still
> > > > > > > > > write to the end of temporary WAL;
> > > > > > > > > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > > > > > > > 8.4.1. Start the first checkpoint;
> > > > > > > > > 8.4.2. Wait for the first checkpoint ends and own the cache
> > > > > 
> > > > > partition;
> > > > > > > > > 8.4.3. All operations now are switched to the partition file instead
> > > > > > > > > of writing to the temporary WAL;
> > > > > > > > > 8.4.4. Schedule the temporary WAL storage deletion;
> > > > > > > > > 9. The supplier node deletes the temporary cache partition file;
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > *COMPONENTS TO CHANGE*
> > > > > > > > > 
> > > > > > > > > CommunicationSpi
> > > > > > > > > 
> > > > > > > > > To benefit from zero copy we must delegate the file transferring to
> > > > > > > > > FileChannel#transferTo(long, long,
> > > > > > > > > java.nio.channels.WritableByteChannel) because the fast path of
> > > > > > > > > transferTo method is only executed if the destination buffer
> > > > > 
> > > > > inherits
> > > > > > > > > from an internal JDK class.
> > > > > > > > > 
> > > > > > > > > Preloader
> > > > > > > > > 
> > > > > > > > > A new implementation of cache entries preloader assume to be done.
> > > > > 
> > > > > The
> > > > > > > > > new implementation must send and receive cache partition files over
> > > > > > > > > the CommunicationSpi channels by chunks of data with validation
> > > > > > > > > received items. The new layer over the cache partition file must
> > > > > > > > > support direct using of FileChannel#transferTo method over the
> > > > > > > > > CommunicationSpi pipe connection. The connection bandwidth of the
> > > > > > > > > cache partition file transfer must have the ability to be limited at
> > > > > > > > > runtime.
> > > > > > > > > 
> > > > > > > > > Checkpointer
> > > > > > > > > 
> > > > > > > > > When the supplier node receives the cache partition file demand
> > > > > > > > > request it will send the file over the CommunicationSpi. The cache
> > > > > > > > > partition file can be concurrently updated by checkpoint thread
> > > > > 
> > > > > during
> > > > > > > > > its transmission. To guarantee the file consistency Сheckpointer
> > > > > 
> > > > > must
> > > > > > > > > use copy-on-write technique and save a copy of updated chunk into
> > > > > 
> > > > > the
> > > > > > > > > temporary file.
> > > > > > > > > 
> > > > > > > > > (new) Catch-up temporary WAL
> > > > > > > > > 
> > > > > > > > > While the demander node is in the partition file transmission state
> > > > > 
> > > > > it
> > > > > > > > > must save all cache entries corresponding to the moving partition
> > > > > 
> > > > > into
> > > > > > > > > a new temporary WAL storage. These entries will be applied later one
> > > > > > > > > by one on the received cache partition file. All asynchronous
> > > > > > > > > operations will be enrolled to the end of temporary WAL storage
> > > > > 
> > > > > during
> > > > > > > > > storage reads until it becomes fully read. The file-based FIFO
> > > > > > > > > approach assumes to be used by this process.
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > *RECOVERY*
> > > > > > > > > 
> > > > > > > > > In case of crash recovery, there is no additional actions need to be
> > > > > > > > > applied to keep the cache partition file consistency. We are not
> > > > > > > > > recovering partition with the moving state, thus the single
> > > > > 
> > > > > partition
> > > > > > > > > file will be lost and only it. The uniqueness of it is guaranteed by
> > > > > > > > > the single-file-transmission process. The cache partition file will
> > > > > 
> > > > > be
> > > > > > > > > fully loaded on the next rebalance procedure.
> > > > > > > > > 
> > > > > > > > > To provide default cluster recovery guarantee we must to:
> > > > > > > > > 1. Start the checkpoint process when the temporary WAL storage
> > > > > 
> > > > > becomes
> > > > > > > > > empty;
> > > > > > > > > 2. Wait for the first checkpoint ends and set owning status to the
> > > > > > > > > cache partition;
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > [1]
> > > > > > > > > 
> > > > > 
> > > > > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > > > > > > > [2]
> > > > > > > > > 
> > > > > 
> > > > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > > > > > > > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > > > > > > > [4]
> > > > > > > > > 
> > > > > 
> > > > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > > > > > > > 

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Maxim Muzafarov <ma...@gmail.com>.
Igniters,


Since the file transmission between Ignite nodes [2] have been merged
to the master branch (it is the first mandatory part of file-based
rebalance procedure) I'd like to focus on the next step of the current
IEP-28 - the process of creating snapshots of cache group partitions.

Previously, we've discussed a creation of cache group backups [3] for
the whole cluster. I'd like to take into account the GridGain
experience with such snapshot creation and, at first, focuses on the
local internal IgniteBackupManager which will be used for such purpose
[4] [1].

Changes are almost ready. I need some additional time to finalize the
PR with backup creation.


API (create local partitions copy)

/**
 * @param name Unique backup name.
 * @param parts Collection of pairs group and appropratate cache
partition to be backuped.
 * @param dir Local backup directory.
 */
public IgniteInternalFuture<?> backup(
    String name,
    Map<Integer, Set<Integer>> parts,
    File dir,
    ExecutorService backupSvc (this can be completely optional)
);


API (backup partitoins over the network)

/**
 * @param name Unique backup name.
 * @param parts Collection of pairs group and appropratate cache
partition to be backuped.
 * @param snd File sender provider.
 */
public IgniteInternalFuture<?> backup(
    String name,
    Map<Integer, Set<Integer>> parts,
    Supplier<GridIoManager.TransmissionSender> snd
);

[1] https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing#IEP-28:Clusterpeer-2-peerbalancing-Copypartitiononthefly
[2] https://issues.apache.org/jira/browse/IGNITE-10619
[3] http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Hot-cache-backup-td41034.html
[4] https://issues.apache.org/jira/browse/IGNITE-11073

On Wed, 12 Dec 2018 at 11:15, Vladimir Ozerov <vo...@gridgain.com> wrote:
>
> Maxim,
>
> Thank you for excellent analysis! From profiling data I see the following:
> 1) Almost no parallelism - one rebalance thread is used (default), two responses are sent per a single demand request (default)
> 2) All system resources are underutilized - CPU, disk, network
> 3) Huge hotspot ion free lists
>
> In general I would recommend to consider the following points during further rebalance optimization:
> 1) Start with the fact that rebalance always causes system degradation due to additional hardware resources required. Different deployments may require different degradation modes. Sometimes "soft" mode is preferable - long rebalance with low system overhead. This is what we see now. Sometimes the opposite - as short rebalance as possible at the cost of severe degradation in operations. Sometimes - something in the middle. Every optimization we made should have clear explanation on how system degrades.
> 2) We need to investigate the hotspot on free lists. Looks like this is the main limiting factor for now. Alex, do you have any ideas what is this? Is it possible to bypass freelists completely during rebalance at the cost of higher data fragmentation during concurrent updates?
> 3) We need to investigate streaming rebalance mode, when supplier constantly streams data to demander similarly to our data streamer. It should be fairly easy to implement, applicable for all modes and may speedup rebalance up to 5-10 times. Great thing about this approach is that it will allow users to choose between system stress level and rebalance throughput easily.
> 4) File transfer rebalance: we need to have clear design of failure and concurrency cases and degradation modes. Several questions to answer:
> 4.1) What would happen if another rebalance starts when previous is not finished yet?
> 4.2) What would happen if supplier or demander fails in the middle? What kind of cleanup would be required
> 4.3) Degradation: what kind of problems should users expect due to massive disk and network load during file transfer and due to data merging on demander side?
> 4.4) Degradation: how secondary indexes would be rebuilt on demander side? Note that until indexes are ready node is not operational and cannot become partition owner, and index rebuild is essentially full data rescan with potentially the same issues with slow updates of persistent data structures we have now.
>
> Vladimir.
>
> On Fri, Dec 7, 2018 at 3:32 PM Maxim Muzafarov <ma...@gmail.com> wrote:
>>
>> Vladimir,
>>
>>
>> Let me propose to consider the whole this rebalance process as having
>> three strategies:
>> - The classical message-based approach, preferable to use for in-memory caches;
>> - Historical rebalance based on WAL, used for rebalancing persisted
>> caches deltas;
>> - (new) File-based rebalance (current IEP-28), used for relocation of
>> full cache partitions.
>>
>>
>> First of all, I want to show you that for the full cache relocation
>> file-based rebalancing strategy from my point has a set of advantages
>> prior to the message-based approach. Let's also assume that the time
>> spent on WAL logging during the rebalance procedure is already
>> optimized (we are not taking it into account at all).
>>
>> According to preliminary measurements [8] and the message above we
>> spend more than 65% of rebalancing time on creating K-V cache pair for
>> preloading entries and supporting internal data structures. It is true
>> as for in-memory cluster configuration and for a cluster with enabled
>> persistence. It is also true, that these data structures can be used
>> more efficiently by implementing batch entry processing for them. And
>> it should be done (a ticket for it is already created [3]).
>>
>> Let's have a look closer to the simple example.
>>
>> I've collected some information about a cache on my stress-testing cluster:
>> partitions (total): 65534
>> single partition size: 437 MB
>> rebalance batch: 512 Kb
>> batches per partition: 874
>> partitions per node: 606
>> batches per node: 529644
>>
>> Let's assume that we've already implemented batched entry processing
>> and we perform bulk operations over internal data structures.
>> Regarding these assumptions, we still need to process 874 batches per
>> each cache partition to transfer data. I will cost us up to 15 seconds
>> per each partition file, a lot of CPU cycles to maintain internal data
>> structures and block for a while other threads waiting for releasing
>> database checkpoint lock.
>>
>> Increasing the rebalance batch size is not efficient here because we
>> are starting to hold the database lock for too long. It will lead to
>> thread starvation and will only slow down the whole rebalance speed.
>> Exactly the same as increasing batch size, making the rebalance thread
>> pool bigger can lead to the cluster performance drop for almost the
>> same reasons.
>>
>> I think the file-based rebalance can provide us (prior to the batch
>> processing) for huge caches:
>>  - a fair non-blocking approach in each part of the rebalancing procedure;
>>  - reduce the number of locks being acquired (the other threads can
>> make bigger progress);
>>  - a zero-copy transmission on supplier saves CPU cycles and memory bandwidth;
>>  - as a result, the transferable batch size increased up to the whole
>> partition file size;
>>
>> SUMMARY TO DO
>>
>> The plan to do and other ideas (without risks evaluation):
>>
>> Message-based approach.
>> Optimization to do by priority [3] [2] and may be [9].
>>
>> Historical rebalance based on WAL.
>> Suppose, nothing to do here as Sergey already working on the issue [1]
>> with turning off WAL.
>>
>> (new) Full cache data relocation.
>> Prototyping current IEP-28.
>>
>> I think another approach can be also implemented.
>> During the rebalance procedure we can write entries to data pages
>> directly skipping free lists, PK index and secondary index. Once the
>> partition preloading is finished, we will rebuild free list and all
>> indexes.
>> Will it work for us?
>>
>> ANSWERS
>>
>> > 1) Is it correct that supplier sends only one message for every demand
>> > message? If yes, then streaming should improve network utilization a lot.
>>
>> I think we already have such ability for the Apache Ignite (not
>> exactly streaming). The CacheConfiguration#rebalanceBatchesPrefetchCnt
>> can be used here to reduce the system delay between send\receive
>> message process. The default value is more than enough for most of the
>> cases. The testing results showed only 7 seconds (0.32%) delay during
>> the 40 min cache rebalance procedure. So, each supply message is ready
>> to be sent when the next demand message arrives.
>>
>>
>> > 2) Is it correct that for user caches we process supply messages in a
>> > system pool? Did we consider moving it to striped pool? Because if all
>> > operations on a single partition is ordered, we may apply a number of
>> > critical optimizations - bypassing page cache and checkpointer for new
>> > entries, batched index updates, batched free list updates, etc.
>>
>> I think the rebalance procedure should not cause a thousand messages
>> per second, so we don't need to move the rebalance procedure to the
>> stripped pool. We should have a limited stable load for rebalancing
>> procedure on the cluster. As for the second part, are you talking
>> about thread per partition model? If yes, we have tickets for it [4],
>> [5], [6].
>>
>> > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
>> > conditions when it could be disabled on supplier side?
>>
>> Do you mean the demander side? Why we should try to disable it on the
>> supplier node? I do not take it into account at all because it can be
>> easily done (suppose issue [1] is about it). But it doesn't help us
>> much for the full cache relocations.
>>
>> > 4) Most important - have we tried to profile plain single-threaded
>> > rebalance without concurrent write load? We need to have clear
>> > understanding on where time is spent - supplier/demander, cpu/network/disk,
>> > etc. Some Java tracing code should help.
>>
>> I've updated some information about profiling results on the
>> confluence page [8]. If you will find that I've missed something or
>> information is unclear, please, let me know and I will fix it.
>>
>> > And one question regarding proposed implementation - how are we going to
>> > handle secondary indexes?
>>
>> Thank you for pointing this out. Actually, the current IEP page
>> doesn't cover this case. I think we can schedule rebuild indexes after
>> all partition files would be transferred. This approach was also
>> mentioned at [2] issue.
>> Will it be the correct?
>>
>>
>> [1] https://issues.apache.org/jira/browse/IGNITE-10505
>> [2] https://issues.apache.org/jira/browse/IGNITE-7934
>> [3] https://issues.apache.org/jira/browse/IGNITE-7935
>>
>> [4] https://issues.apache.org/jira/browse/IGNITE-4682
>> [5] https://issues.apache.org/jira/browse/IGNITE-4506
>> [6] https://issues.apache.org/jira/browse/IGNITE-4680
>>
>> [7] https://issues.apache.org/jira/browse/IGNITE-7027
>> [8] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
>> [9] https://issues.apache.org/jira/browse/IGNITE-9520
>> On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com> wrote:
>> >
>> > Maxim,
>> >
>> > Regarding MVCC - this is essentially a copy-on-write approach. New entry is
>> > created on every update. They are cleaned asynchronously by dedicated
>> > threads (aka "vacuum").
>> >
>> > I looked at the document you mentioned, thank you for pointing to it. But
>> > it doesn't answer all questions around existing design, and what I am
>> > trying to do is to get how deep do we understand current problems. It is
>> > very true that various subsystems, such as buffer managers, WALs,
>> > supporting sctructures, etc. incur very serious overhead. And when it comes
>> > to heavy operations implementors typically seek for a way to bypass as much
>> > components as possible, taking in count that different shortcuts lead to
>> > different types of side effects. And IMO our very important goal for now is
>> > to create space of possible improvements, and estimate their costs, risks
>> > and applicability for product's configuration space.
>> >
>> > Let me claridy several questions about current rebalance implementation, as
>> > I am not a big expert here.
>> > 1) Is it correct that supplier sends only one message for every demand
>> > message? If yes, then streaming should improve network utilization a lot.
>> > 2) Is it correct that for user caches we process supply messages in a
>> > system pool? Did we consider moving it to striped pool? Because if all
>> > operations on a single partition is ordered, we may apply a number of
>> > critical optimizations - bypassing page cache and checkpointer for new
>> > entries, batched index updates, batched free list updates, etc.
>> > 3) Seems that WAL should no longer be a problem for us [1]. What are exact
>> > conditions when it could be disabled on supplier side?
>> > 4) Most important - have we tried to profile plain single-threaded
>> > rebalance without concurrent write load? We need to have clear
>> > understanding on where time is spent - supplier/demander, cpu/network/disk,
>> > etc. Some Java tracing code should help.
>> >
>> > And one question regarding proposed implementation - how are we going to
>> > handle secondary indexes?
>> >
>> > [1] https://issues.apache.org/jira/browse/IGNITE-8017
>> >
>> >
>> > On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:
>> >
>> > > Eduard,
>> > >
>> > > Thank you very much for the discussion!
>> > >
>> > > Your algorithm looks much better for me too and easier to implement.
>> > > I'll update appropriate process points on IEP page of the proposed
>> > > rebalance procedure.
>> > > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
>> > > <ed...@gmail.com> wrote:
>> > > >
>> > > > So, after some discussion, I could describe another approach on how to
>> > > > build consistent partition on the fly.
>> > > >
>> > > > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
>> > > > 2. After checkpoint finish, we start sending partition file (without any
>> > > > lock) to the receiver from 0 to fixed size.
>> > > > 3. Next checkpoints if they detect that they would override some pages of
>> > > > transferring file should write the previous state of a page to a
>> > > dedicated
>> > > > file.
>> > > > So, we would have a list of pages written 1 by 1, page id is written in
>> > > the
>> > > > page itself so we could determine page index. Let's name it log.
>> > > > 4. When transfer finished checkpointer would stop updating log-file. Now
>> > > we
>> > > > are ready to send it to the receiver.
>> > > > 5. On receiver side we start merging the dirty partition file with log
>> > > > (updating it with pages from log-file).
>> > > >
>> > > > So, an advantage of this method:
>> > > > - checkpoint-thread work couldn't  increase more than twice;
>> > > > - checkpoint-threads shouldn't wait for anything;
>> > > > - in best case, we receive partition without any extra effort.
>> > > >
>> > > >
>> > > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
>> > > > eduard.shangareev@gmail.com> wrote:
>> > > >
>> > > > > Maxim,
>> > > > >
>> > > > > I have looked through your algorithm of reading partition consistently.
>> > > > > And I have some questions/comments.
>> > > > >
>> > > > > 1. The algorithm requires heavy synchronization between
>> > > checkpoint-thread
>> > > > > and new-approach-rebalance-threads,
>> > > > > because you need strong guarantees to not start writing or reading to
>> > > > > chunk which was updated or started reading by the counterpart.
>> > > > >
>> > > > > 2. Also, if we have started transferring this chunk in original
>> > > partition
>> > > > > couldn't be updated by checkpoint-threads. They should wait for
>> > > transfer
>> > > > > finishing.
>> > > > >
>> > > > > 3. If sending is slow and partition is updated then in worst case
>> > > > > checkpoint-threads would create the whole copy of the partition.
>> > > > >
>> > > > > So, what we have:
>> > > > > -on every page write checkpoint-thread should synchronize with
>> > > > > new-approach-rebalance-threads;
>> > > > > -checkpoint-thread should do extra-work, sometimes this could be as
>> > > huge
>> > > > > as copying the whole partition.
>> > > > >
>> > > > >
>> > > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
>> > > ilya.kasnacheev@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > >> Hello!
>> > > > >>
>> > > > >> This proposal will also happily break my compression-with-dictionary
>> > > patch
>> > > > >> since it relies currently on only having local dictionaries.
>> > > > >>
>> > > > >> However, when you have compressed data, maybe speed boost is even
>> > > greater
>> > > > >> with your approach.
>> > > > >>
>> > > > >> Regards,
>> > > > >> --
>> > > > >> Ilya Kasnacheev
>> > > > >>
>> > > > >>
>> > > > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
>> > > > >>
>> > > > >> > Igniters,
>> > > > >> >
>> > > > >> >
>> > > > >> > I'd like to take the next step of increasing the Apache Ignite with
>> > > > >> > enabled persistence rebalance speed. Currently, the rebalancing
>> > > > >> > procedure doesn't utilize the network and storage device throughout
>> > > to
>> > > > >> > its full extent even with enough meaningful values of
>> > > > >> > rebalanceThreadPoolSize property. As part of the previous discussion
>> > > > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
>> > > > >> > idea [3] of transferring cache partition files over the network.
>> > > > >> > From my point, the case to which this type of rebalancing procedure
>> > > > >> > can bring the most benefit – is adding a completely new node or set
>> > > of
>> > > > >> > new nodes to the cluster. Such a scenario implies fully relocation
>> > > of
>> > > > >> > cache partition files to the new node. To roughly estimate the
>> > > > >> > superiority of partition file transmitting over the network the
>> > > native
>> > > > >> > Linux scp\rsync commands can be used. My test environment showed the
>> > > > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
>> > > > >> > single-threaded rebalance speed.
>> > > > >> >
>> > > > >> >
>> > > > >> > I've prepared the design document IEP-28 [4] and accumulated all the
>> > > > >> > process details of a new rebalance approach on that page. Below you
>> > > > >> > can find the most significant details of the new rebalance procedure
>> > > > >> > and components of the Apache Ignite which are proposed to change.
>> > > > >> >
>> > > > >> > Any feedback is very appreciated.
>> > > > >> >
>> > > > >> >
>> > > > >> > *PROCESS OVERVIEW*
>> > > > >> >
>> > > > >> > The whole process is described in terms of rebalancing single cache
>> > > > >> > group and partition files would be rebalanced one-by-one:
>> > > > >> >
>> > > > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
>> > > > >> > supplier node;
>> > > > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
>> > > > >> > starts the new checkpoint process;
>> > > > >> > 3. The supplier node creates empty the temporary cache partition
>> > > file
>> > > > >> > with .tmp postfix in the same cache persistence directory;
>> > > > >> > 4. The supplier node splits the whole cache partition file into
>> > > > >> > virtual chunks of predefined size (multiply to the PageMemory size);
>> > > > >> > 4.1. If the concurrent checkpoint thread determines the appropriate
>> > > > >> > cache partition file chunk and tries to flush dirty page to the
>> > > cache
>> > > > >> > partition file
>> > > > >> > 4.1.1. If rebalance chunk already transferred
>> > > > >> > 4.1.1.1. Flush the dirty page to the file;
>> > > > >> > 4.1.2. If rebalance chunk not transferred
>> > > > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
>> > > > >> > 4.1.2.2. Flush the dirty page to the file;
>> > > > >> > 4.2. The node starts sending to the demander node each cache
>> > > partition
>> > > > >> > file chunk one by one using FileChannel#transferTo
>> > > > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
>> > > > >> > it from the temporary cache partition file;
>> > > > >> > 4.2.2. If the current chunk is not touched – read it from the
>> > > original
>> > > > >> > cache partition file;
>> > > > >> > 5. The demander node starts to listen to new pipe incoming
>> > > connections
>> > > > >> > from the supplier node on TcpCommunicationSpi;
>> > > > >> > 6. The demander node creates the temporary cache partition file with
>> > > > >> > .tmp postfix in the same cache persistence directory;
>> > > > >> > 7. The demander node receives each cache partition file chunk one
>> > > by one
>> > > > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
>> > > chunk;
>> > > > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
>> > > > >> > partition file position;
>> > > > >> > 8. When the demander node receives the whole cache partition file
>> > > > >> > 8.1. The node initializes received .tmp file as its appropriate
>> > > cache
>> > > > >> > partition file;
>> > > > >> > 8.2. Thread-per-partition begins to apply for data entries from the
>> > > > >> > beginning of WAL-temporary storage;
>> > > > >> > 8.3. All async operations corresponding to this partition file still
>> > > > >> > write to the end of temporary WAL;
>> > > > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
>> > > > >> > 8.4.1. Start the first checkpoint;
>> > > > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
>> > > partition;
>> > > > >> > 8.4.3. All operations now are switched to the partition file instead
>> > > > >> > of writing to the temporary WAL;
>> > > > >> > 8.4.4. Schedule the temporary WAL storage deletion;
>> > > > >> > 9. The supplier node deletes the temporary cache partition file;
>> > > > >> >
>> > > > >> >
>> > > > >> > *COMPONENTS TO CHANGE*
>> > > > >> >
>> > > > >> > CommunicationSpi
>> > > > >> >
>> > > > >> > To benefit from zero copy we must delegate the file transferring to
>> > > > >> > FileChannel#transferTo(long, long,
>> > > > >> > java.nio.channels.WritableByteChannel) because the fast path of
>> > > > >> > transferTo method is only executed if the destination buffer
>> > > inherits
>> > > > >> > from an internal JDK class.
>> > > > >> >
>> > > > >> > Preloader
>> > > > >> >
>> > > > >> > A new implementation of cache entries preloader assume to be done.
>> > > The
>> > > > >> > new implementation must send and receive cache partition files over
>> > > > >> > the CommunicationSpi channels by chunks of data with validation
>> > > > >> > received items. The new layer over the cache partition file must
>> > > > >> > support direct using of FileChannel#transferTo method over the
>> > > > >> > CommunicationSpi pipe connection. The connection bandwidth of the
>> > > > >> > cache partition file transfer must have the ability to be limited at
>> > > > >> > runtime.
>> > > > >> >
>> > > > >> > Checkpointer
>> > > > >> >
>> > > > >> > When the supplier node receives the cache partition file demand
>> > > > >> > request it will send the file over the CommunicationSpi. The cache
>> > > > >> > partition file can be concurrently updated by checkpoint thread
>> > > during
>> > > > >> > its transmission. To guarantee the file consistency Сheckpointer
>> > > must
>> > > > >> > use copy-on-write technique and save a copy of updated chunk into
>> > > the
>> > > > >> > temporary file.
>> > > > >> >
>> > > > >> > (new) Catch-up temporary WAL
>> > > > >> >
>> > > > >> > While the demander node is in the partition file transmission state
>> > > it
>> > > > >> > must save all cache entries corresponding to the moving partition
>> > > into
>> > > > >> > a new temporary WAL storage. These entries will be applied later one
>> > > > >> > by one on the received cache partition file. All asynchronous
>> > > > >> > operations will be enrolled to the end of temporary WAL storage
>> > > during
>> > > > >> > storage reads until it becomes fully read. The file-based FIFO
>> > > > >> > approach assumes to be used by this process.
>> > > > >> >
>> > > > >> >
>> > > > >> > *RECOVERY*
>> > > > >> >
>> > > > >> > In case of crash recovery, there is no additional actions need to be
>> > > > >> > applied to keep the cache partition file consistency. We are not
>> > > > >> > recovering partition with the moving state, thus the single
>> > > partition
>> > > > >> > file will be lost and only it. The uniqueness of it is guaranteed by
>> > > > >> > the single-file-transmission process. The cache partition file will
>> > > be
>> > > > >> > fully loaded on the next rebalance procedure.
>> > > > >> >
>> > > > >> > To provide default cluster recovery guarantee we must to:
>> > > > >> > 1. Start the checkpoint process when the temporary WAL storage
>> > > becomes
>> > > > >> > empty;
>> > > > >> > 2. Wait for the first checkpoint ends and set owning status to the
>> > > > >> > cache partition;
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> > [1]
>> > > > >> >
>> > > > >>
>> > > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
>> > > > >> > [2]
>> > > > >> >
>> > > > >>
>> > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
>> > > > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
>> > > > >> > [4]
>> > > > >> >
>> > > > >>
>> > > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
>> > > > >> >
>> > > > >>
>> > > > >
>> > >

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Vladimir Ozerov <vo...@gridgain.com>.
Maxim,

Thank you for excellent analysis! From profiling data I see the following:
1) Almost no parallelism - one rebalance thread is used (default), two
responses are sent per a single demand request (default)
2) All system resources are underutilized - CPU, disk, network
3) Huge hotspot ion free lists

In general I would recommend to consider the following points during
further rebalance optimization:
1) Start with the fact that rebalance always causes system degradation due
to additional hardware resources required. Different deployments may
require different degradation modes. Sometimes "soft" mode is preferable -
long rebalance with low system overhead. This is what we see now. Sometimes
the opposite - as short rebalance as possible at the cost of severe
degradation in operations. Sometimes - something in the middle. Every
optimization we made should have clear explanation on how system degrades.
2) We need to investigate the hotspot on free lists. Looks like this is the
main limiting factor for now. Alex, do you have any ideas what is this? Is
it possible to bypass freelists completely during rebalance at the cost of
higher data fragmentation during concurrent updates?
3) We need to investigate streaming rebalance mode, when supplier
constantly streams data to demander similarly to our data streamer. It
should be fairly easy to implement, applicable for all modes and may
speedup rebalance up to 5-10 times. Great thing about this approach is that
it will allow users to choose between system stress level and rebalance
throughput easily.
4) File transfer rebalance: we need to have clear design of failure and
concurrency cases and degradation modes. Several questions to answer:
4.1) What would happen if another rebalance starts when previous is not
finished yet?
4.2) What would happen if supplier or demander fails in the middle? What
kind of cleanup would be required
4.3) Degradation: what kind of problems should users expect due to massive
disk and network load during file transfer and due to data merging on
demander side?
4.4) Degradation: how secondary indexes would be rebuilt on demander side?
Note that until indexes are ready node is not operational and cannot become
partition owner, and index rebuild is essentially full data rescan with
potentially the same issues with slow updates of persistent data structures
we have now.

Vladimir.

On Fri, Dec 7, 2018 at 3:32 PM Maxim Muzafarov <ma...@gmail.com> wrote:

> Vladimir,
>
>
> Let me propose to consider the whole this rebalance process as having
> three strategies:
> - The classical message-based approach, preferable to use for in-memory
> caches;
> - Historical rebalance based on WAL, used for rebalancing persisted
> caches deltas;
> - (new) File-based rebalance (current IEP-28), used for relocation of
> full cache partitions.
>
>
> First of all, I want to show you that for the full cache relocation
> file-based rebalancing strategy from my point has a set of advantages
> prior to the message-based approach. Let's also assume that the time
> spent on WAL logging during the rebalance procedure is already
> optimized (we are not taking it into account at all).
>
> According to preliminary measurements [8] and the message above we
> spend more than 65% of rebalancing time on creating K-V cache pair for
> preloading entries and supporting internal data structures. It is true
> as for in-memory cluster configuration and for a cluster with enabled
> persistence. It is also true, that these data structures can be used
> more efficiently by implementing batch entry processing for them. And
> it should be done (a ticket for it is already created [3]).
>
> Let's have a look closer to the simple example.
>
> I've collected some information about a cache on my stress-testing cluster:
> partitions (total): 65534
> single partition size: 437 MB
> rebalance batch: 512 Kb
> batches per partition: 874
> partitions per node: 606
> batches per node: 529644
>
> Let's assume that we've already implemented batched entry processing
> and we perform bulk operations over internal data structures.
> Regarding these assumptions, we still need to process 874 batches per
> each cache partition to transfer data. I will cost us up to 15 seconds
> per each partition file, a lot of CPU cycles to maintain internal data
> structures and block for a while other threads waiting for releasing
> database checkpoint lock.
>
> Increasing the rebalance batch size is not efficient here because we
> are starting to hold the database lock for too long. It will lead to
> thread starvation and will only slow down the whole rebalance speed.
> Exactly the same as increasing batch size, making the rebalance thread
> pool bigger can lead to the cluster performance drop for almost the
> same reasons.
>
> I think the file-based rebalance can provide us (prior to the batch
> processing) for huge caches:
>  - a fair non-blocking approach in each part of the rebalancing procedure;
>  - reduce the number of locks being acquired (the other threads can
> make bigger progress);
>  - a zero-copy transmission on supplier saves CPU cycles and memory
> bandwidth;
>  - as a result, the transferable batch size increased up to the whole
> partition file size;
>
> SUMMARY TO DO
>
> The plan to do and other ideas (without risks evaluation):
>
> Message-based approach.
> Optimization to do by priority [3] [2] and may be [9].
>
> Historical rebalance based on WAL.
> Suppose, nothing to do here as Sergey already working on the issue [1]
> with turning off WAL.
>
> (new) Full cache data relocation.
> Prototyping current IEP-28.
>
> I think another approach can be also implemented.
> During the rebalance procedure we can write entries to data pages
> directly skipping free lists, PK index and secondary index. Once the
> partition preloading is finished, we will rebuild free list and all
> indexes.
> Will it work for us?
>
> ANSWERS
>
> > 1) Is it correct that supplier sends only one message for every demand
> > message? If yes, then streaming should improve network utilization a lot.
>
> I think we already have such ability for the Apache Ignite (not
> exactly streaming). The CacheConfiguration#rebalanceBatchesPrefetchCnt
> can be used here to reduce the system delay between send\receive
> message process. The default value is more than enough for most of the
> cases. The testing results showed only 7 seconds (0.32%) delay during
> the 40 min cache rebalance procedure. So, each supply message is ready
> to be sent when the next demand message arrives.
>
>
> > 2) Is it correct that for user caches we process supply messages in a
> > system pool? Did we consider moving it to striped pool? Because if all
> > operations on a single partition is ordered, we may apply a number of
> > critical optimizations - bypassing page cache and checkpointer for new
> > entries, batched index updates, batched free list updates, etc.
>
> I think the rebalance procedure should not cause a thousand messages
> per second, so we don't need to move the rebalance procedure to the
> stripped pool. We should have a limited stable load for rebalancing
> procedure on the cluster. As for the second part, are you talking
> about thread per partition model? If yes, we have tickets for it [4],
> [5], [6].
>
> > 3) Seems that WAL should no longer be a problem for us [1]. What are
> exact
> > conditions when it could be disabled on supplier side?
>
> Do you mean the demander side? Why we should try to disable it on the
> supplier node? I do not take it into account at all because it can be
> easily done (suppose issue [1] is about it). But it doesn't help us
> much for the full cache relocations.
>
> > 4) Most important - have we tried to profile plain single-threaded
> > rebalance without concurrent write load? We need to have clear
> > understanding on where time is spent - supplier/demander,
> cpu/network/disk,
> > etc. Some Java tracing code should help.
>
> I've updated some information about profiling results on the
> confluence page [8]. If you will find that I've missed something or
> information is unclear, please, let me know and I will fix it.
>
> > And one question regarding proposed implementation - how are we going to
> > handle secondary indexes?
>
> Thank you for pointing this out. Actually, the current IEP page
> doesn't cover this case. I think we can schedule rebuild indexes after
> all partition files would be transferred. This approach was also
> mentioned at [2] issue.
> Will it be the correct?
>
>
> [1] https://issues.apache.org/jira/browse/IGNITE-10505
> [2] https://issues.apache.org/jira/browse/IGNITE-7934
> [3] https://issues.apache.org/jira/browse/IGNITE-7935
>
> [4] https://issues.apache.org/jira/browse/IGNITE-4682
> [5] https://issues.apache.org/jira/browse/IGNITE-4506
> [6] https://issues.apache.org/jira/browse/IGNITE-4680
>
> [7] https://issues.apache.org/jira/browse/IGNITE-7027
> [8]
> https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
> [9] https://issues.apache.org/jira/browse/IGNITE-9520
> On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com>
> wrote:
> >
> > Maxim,
> >
> > Regarding MVCC - this is essentially a copy-on-write approach. New entry
> is
> > created on every update. They are cleaned asynchronously by dedicated
> > threads (aka "vacuum").
> >
> > I looked at the document you mentioned, thank you for pointing to it. But
> > it doesn't answer all questions around existing design, and what I am
> > trying to do is to get how deep do we understand current problems. It is
> > very true that various subsystems, such as buffer managers, WALs,
> > supporting sctructures, etc. incur very serious overhead. And when it
> comes
> > to heavy operations implementors typically seek for a way to bypass as
> much
> > components as possible, taking in count that different shortcuts lead to
> > different types of side effects. And IMO our very important goal for now
> is
> > to create space of possible improvements, and estimate their costs, risks
> > and applicability for product's configuration space.
> >
> > Let me claridy several questions about current rebalance implementation,
> as
> > I am not a big expert here.
> > 1) Is it correct that supplier sends only one message for every demand
> > message? If yes, then streaming should improve network utilization a lot.
> > 2) Is it correct that for user caches we process supply messages in a
> > system pool? Did we consider moving it to striped pool? Because if all
> > operations on a single partition is ordered, we may apply a number of
> > critical optimizations - bypassing page cache and checkpointer for new
> > entries, batched index updates, batched free list updates, etc.
> > 3) Seems that WAL should no longer be a problem for us [1]. What are
> exact
> > conditions when it could be disabled on supplier side?
> > 4) Most important - have we tried to profile plain single-threaded
> > rebalance without concurrent write load? We need to have clear
> > understanding on where time is spent - supplier/demander,
> cpu/network/disk,
> > etc. Some Java tracing code should help.
> >
> > And one question regarding proposed implementation - how are we going to
> > handle secondary indexes?
> >
> > [1] https://issues.apache.org/jira/browse/IGNITE-8017
> >
> >
> > On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com>
> wrote:
> >
> > > Eduard,
> > >
> > > Thank you very much for the discussion!
> > >
> > > Your algorithm looks much better for me too and easier to implement.
> > > I'll update appropriate process points on IEP page of the proposed
> > > rebalance procedure.
> > > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > > <ed...@gmail.com> wrote:
> > > >
> > > > So, after some discussion, I could describe another approach on how
> to
> > > > build consistent partition on the fly.
> > > >
> > > > 1. We make a checkpoint, fix the size of the partition in
> OffheapManager.
> > > > 2. After checkpoint finish, we start sending partition file (without
> any
> > > > lock) to the receiver from 0 to fixed size.
> > > > 3. Next checkpoints if they detect that they would override some
> pages of
> > > > transferring file should write the previous state of a page to a
> > > dedicated
> > > > file.
> > > > So, we would have a list of pages written 1 by 1, page id is written
> in
> > > the
> > > > page itself so we could determine page index. Let's name it log.
> > > > 4. When transfer finished checkpointer would stop updating log-file.
> Now
> > > we
> > > > are ready to send it to the receiver.
> > > > 5. On receiver side we start merging the dirty partition file with
> log
> > > > (updating it with pages from log-file).
> > > >
> > > > So, an advantage of this method:
> > > > - checkpoint-thread work couldn't  increase more than twice;
> > > > - checkpoint-threads shouldn't wait for anything;
> > > > - in best case, we receive partition without any extra effort.
> > > >
> > > >
> > > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > > eduard.shangareev@gmail.com> wrote:
> > > >
> > > > > Maxim,
> > > > >
> > > > > I have looked through your algorithm of reading partition
> consistently.
> > > > > And I have some questions/comments.
> > > > >
> > > > > 1. The algorithm requires heavy synchronization between
> > > checkpoint-thread
> > > > > and new-approach-rebalance-threads,
> > > > > because you need strong guarantees to not start writing or reading
> to
> > > > > chunk which was updated or started reading by the counterpart.
> > > > >
> > > > > 2. Also, if we have started transferring this chunk in original
> > > partition
> > > > > couldn't be updated by checkpoint-threads. They should wait for
> > > transfer
> > > > > finishing.
> > > > >
> > > > > 3. If sending is slow and partition is updated then in worst case
> > > > > checkpoint-threads would create the whole copy of the partition.
> > > > >
> > > > > So, what we have:
> > > > > -on every page write checkpoint-thread should synchronize with
> > > > > new-approach-rebalance-threads;
> > > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > > huge
> > > > > as copying the whole partition.
> > > > >
> > > > >
> > > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > > ilya.kasnacheev@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hello!
> > > > >>
> > > > >> This proposal will also happily break my
> compression-with-dictionary
> > > patch
> > > > >> since it relies currently on only having local dictionaries.
> > > > >>
> > > > >> However, when you have compressed data, maybe speed boost is even
> > > greater
> > > > >> with your approach.
> > > > >>
> > > > >> Regards,
> > > > >> --
> > > > >> Ilya Kasnacheev
> > > > >>
> > > > >>
> > > > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <maxmuzaf@gmail.com
> >:
> > > > >>
> > > > >> > Igniters,
> > > > >> >
> > > > >> >
> > > > >> > I'd like to take the next step of increasing the Apache Ignite
> with
> > > > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > > > >> > procedure doesn't utilize the network and storage device
> throughout
> > > to
> > > > >> > its full extent even with enough meaningful values of
> > > > >> > rebalanceThreadPoolSize property. As part of the previous
> discussion
> > > > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed
> an
> > > > >> > idea [3] of transferring cache partition files over the network.
> > > > >> > From my point, the case to which this type of rebalancing
> procedure
> > > > >> > can bring the most benefit – is adding a completely new node or
> set
> > > of
> > > > >> > new nodes to the cluster. Such a scenario implies fully
> relocation
> > > of
> > > > >> > cache partition files to the new node. To roughly estimate the
> > > > >> > superiority of partition file transmitting over the network the
> > > native
> > > > >> > Linux scp\rsync commands can be used. My test environment
> showed the
> > > > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > > >> > single-threaded rebalance speed.
> > > > >> >
> > > > >> >
> > > > >> > I've prepared the design document IEP-28 [4] and accumulated
> all the
> > > > >> > process details of a new rebalance approach on that page. Below
> you
> > > > >> > can find the most significant details of the new rebalance
> procedure
> > > > >> > and components of the Apache Ignite which are proposed to
> change.
> > > > >> >
> > > > >> > Any feedback is very appreciated.
> > > > >> >
> > > > >> >
> > > > >> > *PROCESS OVERVIEW*
> > > > >> >
> > > > >> > The whole process is described in terms of rebalancing single
> cache
> > > > >> > group and partition files would be rebalanced one-by-one:
> > > > >> >
> > > > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to
> the
> > > > >> > supplier node;
> > > > >> > 2. When the supplier node receives
> GridDhtPartitionDemandMessage and
> > > > >> > starts the new checkpoint process;
> > > > >> > 3. The supplier node creates empty the temporary cache partition
> > > file
> > > > >> > with .tmp postfix in the same cache persistence directory;
> > > > >> > 4. The supplier node splits the whole cache partition file into
> > > > >> > virtual chunks of predefined size (multiply to the PageMemory
> size);
> > > > >> > 4.1. If the concurrent checkpoint thread determines the
> appropriate
> > > > >> > cache partition file chunk and tries to flush dirty page to the
> > > cache
> > > > >> > partition file
> > > > >> > 4.1.1. If rebalance chunk already transferred
> > > > >> > 4.1.1.1. Flush the dirty page to the file;
> > > > >> > 4.1.2. If rebalance chunk not transferred
> > > > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > > >> > 4.1.2.2. Flush the dirty page to the file;
> > > > >> > 4.2. The node starts sending to the demander node each cache
> > > partition
> > > > >> > file chunk one by one using FileChannel#transferTo
> > > > >> > 4.2.1. If the current chunk was modified by checkpoint thread –
> read
> > > > >> > it from the temporary cache partition file;
> > > > >> > 4.2.2. If the current chunk is not touched – read it from the
> > > original
> > > > >> > cache partition file;
> > > > >> > 5. The demander node starts to listen to new pipe incoming
> > > connections
> > > > >> > from the supplier node on TcpCommunicationSpi;
> > > > >> > 6. The demander node creates the temporary cache partition file
> with
> > > > >> > .tmp postfix in the same cache persistence directory;
> > > > >> > 7. The demander node receives each cache partition file chunk
> one
> > > by one
> > > > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> > > chunk;
> > > > >> > 7.2. The node flushes the downloaded chunk at the appropriate
> cache
> > > > >> > partition file position;
> > > > >> > 8. When the demander node receives the whole cache partition
> file
> > > > >> > 8.1. The node initializes received .tmp file as its appropriate
> > > cache
> > > > >> > partition file;
> > > > >> > 8.2. Thread-per-partition begins to apply for data entries from
> the
> > > > >> > beginning of WAL-temporary storage;
> > > > >> > 8.3. All async operations corresponding to this partition file
> still
> > > > >> > write to the end of temporary WAL;
> > > > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > > >> > 8.4.1. Start the first checkpoint;
> > > > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> > > partition;
> > > > >> > 8.4.3. All operations now are switched to the partition file
> instead
> > > > >> > of writing to the temporary WAL;
> > > > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > > > >> > 9. The supplier node deletes the temporary cache partition file;
> > > > >> >
> > > > >> >
> > > > >> > *COMPONENTS TO CHANGE*
> > > > >> >
> > > > >> > CommunicationSpi
> > > > >> >
> > > > >> > To benefit from zero copy we must delegate the file
> transferring to
> > > > >> > FileChannel#transferTo(long, long,
> > > > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > > > >> > transferTo method is only executed if the destination buffer
> > > inherits
> > > > >> > from an internal JDK class.
> > > > >> >
> > > > >> > Preloader
> > > > >> >
> > > > >> > A new implementation of cache entries preloader assume to be
> done.
> > > The
> > > > >> > new implementation must send and receive cache partition files
> over
> > > > >> > the CommunicationSpi channels by chunks of data with validation
> > > > >> > received items. The new layer over the cache partition file must
> > > > >> > support direct using of FileChannel#transferTo method over the
> > > > >> > CommunicationSpi pipe connection. The connection bandwidth of
> the
> > > > >> > cache partition file transfer must have the ability to be
> limited at
> > > > >> > runtime.
> > > > >> >
> > > > >> > Checkpointer
> > > > >> >
> > > > >> > When the supplier node receives the cache partition file demand
> > > > >> > request it will send the file over the CommunicationSpi. The
> cache
> > > > >> > partition file can be concurrently updated by checkpoint thread
> > > during
> > > > >> > its transmission. To guarantee the file consistency Сheckpointer
> > > must
> > > > >> > use copy-on-write technique and save a copy of updated chunk
> into
> > > the
> > > > >> > temporary file.
> > > > >> >
> > > > >> > (new) Catch-up temporary WAL
> > > > >> >
> > > > >> > While the demander node is in the partition file transmission
> state
> > > it
> > > > >> > must save all cache entries corresponding to the moving
> partition
> > > into
> > > > >> > a new temporary WAL storage. These entries will be applied
> later one
> > > > >> > by one on the received cache partition file. All asynchronous
> > > > >> > operations will be enrolled to the end of temporary WAL storage
> > > during
> > > > >> > storage reads until it becomes fully read. The file-based FIFO
> > > > >> > approach assumes to be used by this process.
> > > > >> >
> > > > >> >
> > > > >> > *RECOVERY*
> > > > >> >
> > > > >> > In case of crash recovery, there is no additional actions need
> to be
> > > > >> > applied to keep the cache partition file consistency. We are not
> > > > >> > recovering partition with the moving state, thus the single
> > > partition
> > > > >> > file will be lost and only it. The uniqueness of it is
> guaranteed by
> > > > >> > the single-file-transmission process. The cache partition file
> will
> > > be
> > > > >> > fully loaded on the next rebalance procedure.
> > > > >> >
> > > > >> > To provide default cluster recovery guarantee we must to:
> > > > >> > 1. Start the checkpoint process when the temporary WAL storage
> > > becomes
> > > > >> > empty;
> > > > >> > 2. Wait for the first checkpoint ends and set owning status to
> the
> > > > >> > cache partition;
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > [1]
> > > > >> >
> > > > >>
> > >
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > > >> > [2]
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > > >> > [4]
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > > >> >
> > > > >>
> > > > >
> > >
>

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Maxim Muzafarov <ma...@gmail.com>.
Vladimir,


Let me propose to consider the whole this rebalance process as having
three strategies:
- The classical message-based approach, preferable to use for in-memory caches;
- Historical rebalance based on WAL, used for rebalancing persisted
caches deltas;
- (new) File-based rebalance (current IEP-28), used for relocation of
full cache partitions.


First of all, I want to show you that for the full cache relocation
file-based rebalancing strategy from my point has a set of advantages
prior to the message-based approach. Let's also assume that the time
spent on WAL logging during the rebalance procedure is already
optimized (we are not taking it into account at all).

According to preliminary measurements [8] and the message above we
spend more than 65% of rebalancing time on creating K-V cache pair for
preloading entries and supporting internal data structures. It is true
as for in-memory cluster configuration and for a cluster with enabled
persistence. It is also true, that these data structures can be used
more efficiently by implementing batch entry processing for them. And
it should be done (a ticket for it is already created [3]).

Let's have a look closer to the simple example.

I've collected some information about a cache on my stress-testing cluster:
partitions (total): 65534
single partition size: 437 MB
rebalance batch: 512 Kb
batches per partition: 874
partitions per node: 606
batches per node: 529644

Let's assume that we've already implemented batched entry processing
and we perform bulk operations over internal data structures.
Regarding these assumptions, we still need to process 874 batches per
each cache partition to transfer data. I will cost us up to 15 seconds
per each partition file, a lot of CPU cycles to maintain internal data
structures and block for a while other threads waiting for releasing
database checkpoint lock.

Increasing the rebalance batch size is not efficient here because we
are starting to hold the database lock for too long. It will lead to
thread starvation and will only slow down the whole rebalance speed.
Exactly the same as increasing batch size, making the rebalance thread
pool bigger can lead to the cluster performance drop for almost the
same reasons.

I think the file-based rebalance can provide us (prior to the batch
processing) for huge caches:
 - a fair non-blocking approach in each part of the rebalancing procedure;
 - reduce the number of locks being acquired (the other threads can
make bigger progress);
 - a zero-copy transmission on supplier saves CPU cycles and memory bandwidth;
 - as a result, the transferable batch size increased up to the whole
partition file size;

SUMMARY TO DO

The plan to do and other ideas (without risks evaluation):

Message-based approach.
Optimization to do by priority [3] [2] and may be [9].

Historical rebalance based on WAL.
Suppose, nothing to do here as Sergey already working on the issue [1]
with turning off WAL.

(new) Full cache data relocation.
Prototyping current IEP-28.

I think another approach can be also implemented.
During the rebalance procedure we can write entries to data pages
directly skipping free lists, PK index and secondary index. Once the
partition preloading is finished, we will rebuild free list and all
indexes.
Will it work for us?

ANSWERS

> 1) Is it correct that supplier sends only one message for every demand
> message? If yes, then streaming should improve network utilization a lot.

I think we already have such ability for the Apache Ignite (not
exactly streaming). The CacheConfiguration#rebalanceBatchesPrefetchCnt
can be used here to reduce the system delay between send\receive
message process. The default value is more than enough for most of the
cases. The testing results showed only 7 seconds (0.32%) delay during
the 40 min cache rebalance procedure. So, each supply message is ready
to be sent when the next demand message arrives.


> 2) Is it correct that for user caches we process supply messages in a
> system pool? Did we consider moving it to striped pool? Because if all
> operations on a single partition is ordered, we may apply a number of
> critical optimizations - bypassing page cache and checkpointer for new
> entries, batched index updates, batched free list updates, etc.

I think the rebalance procedure should not cause a thousand messages
per second, so we don't need to move the rebalance procedure to the
stripped pool. We should have a limited stable load for rebalancing
procedure on the cluster. As for the second part, are you talking
about thread per partition model? If yes, we have tickets for it [4],
[5], [6].

> 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> conditions when it could be disabled on supplier side?

Do you mean the demander side? Why we should try to disable it on the
supplier node? I do not take it into account at all because it can be
easily done (suppose issue [1] is about it). But it doesn't help us
much for the full cache relocations.

> 4) Most important - have we tried to profile plain single-threaded
> rebalance without concurrent write load? We need to have clear
> understanding on where time is spent - supplier/demander, cpu/network/disk,
> etc. Some Java tracing code should help.

I've updated some information about profiling results on the
confluence page [8]. If you will find that I've missed something or
information is unclear, please, let me know and I will fix it.

> And one question regarding proposed implementation - how are we going to
> handle secondary indexes?

Thank you for pointing this out. Actually, the current IEP page
doesn't cover this case. I think we can schedule rebuild indexes after
all partition files would be transferred. This approach was also
mentioned at [2] issue.
Will it be the correct?


[1] https://issues.apache.org/jira/browse/IGNITE-10505
[2] https://issues.apache.org/jira/browse/IGNITE-7934
[3] https://issues.apache.org/jira/browse/IGNITE-7935

[4] https://issues.apache.org/jira/browse/IGNITE-4682
[5] https://issues.apache.org/jira/browse/IGNITE-4506
[6] https://issues.apache.org/jira/browse/IGNITE-4680

[7] https://issues.apache.org/jira/browse/IGNITE-7027
[8] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
[9] https://issues.apache.org/jira/browse/IGNITE-9520
On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <vo...@gridgain.com> wrote:
>
> Maxim,
>
> Regarding MVCC - this is essentially a copy-on-write approach. New entry is
> created on every update. They are cleaned asynchronously by dedicated
> threads (aka "vacuum").
>
> I looked at the document you mentioned, thank you for pointing to it. But
> it doesn't answer all questions around existing design, and what I am
> trying to do is to get how deep do we understand current problems. It is
> very true that various subsystems, such as buffer managers, WALs,
> supporting sctructures, etc. incur very serious overhead. And when it comes
> to heavy operations implementors typically seek for a way to bypass as much
> components as possible, taking in count that different shortcuts lead to
> different types of side effects. And IMO our very important goal for now is
> to create space of possible improvements, and estimate their costs, risks
> and applicability for product's configuration space.
>
> Let me claridy several questions about current rebalance implementation, as
> I am not a big expert here.
> 1) Is it correct that supplier sends only one message for every demand
> message? If yes, then streaming should improve network utilization a lot.
> 2) Is it correct that for user caches we process supply messages in a
> system pool? Did we consider moving it to striped pool? Because if all
> operations on a single partition is ordered, we may apply a number of
> critical optimizations - bypassing page cache and checkpointer for new
> entries, batched index updates, batched free list updates, etc.
> 3) Seems that WAL should no longer be a problem for us [1]. What are exact
> conditions when it could be disabled on supplier side?
> 4) Most important - have we tried to profile plain single-threaded
> rebalance without concurrent write load? We need to have clear
> understanding on where time is spent - supplier/demander, cpu/network/disk,
> etc. Some Java tracing code should help.
>
> And one question regarding proposed implementation - how are we going to
> handle secondary indexes?
>
> [1] https://issues.apache.org/jira/browse/IGNITE-8017
>
>
> On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:
>
> > Eduard,
> >
> > Thank you very much for the discussion!
> >
> > Your algorithm looks much better for me too and easier to implement.
> > I'll update appropriate process points on IEP page of the proposed
> > rebalance procedure.
> > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > <ed...@gmail.com> wrote:
> > >
> > > So, after some discussion, I could describe another approach on how to
> > > build consistent partition on the fly.
> > >
> > > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > > 2. After checkpoint finish, we start sending partition file (without any
> > > lock) to the receiver from 0 to fixed size.
> > > 3. Next checkpoints if they detect that they would override some pages of
> > > transferring file should write the previous state of a page to a
> > dedicated
> > > file.
> > > So, we would have a list of pages written 1 by 1, page id is written in
> > the
> > > page itself so we could determine page index. Let's name it log.
> > > 4. When transfer finished checkpointer would stop updating log-file. Now
> > we
> > > are ready to send it to the receiver.
> > > 5. On receiver side we start merging the dirty partition file with log
> > > (updating it with pages from log-file).
> > >
> > > So, an advantage of this method:
> > > - checkpoint-thread work couldn't  increase more than twice;
> > > - checkpoint-threads shouldn't wait for anything;
> > > - in best case, we receive partition without any extra effort.
> > >
> > >
> > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > eduard.shangareev@gmail.com> wrote:
> > >
> > > > Maxim,
> > > >
> > > > I have looked through your algorithm of reading partition consistently.
> > > > And I have some questions/comments.
> > > >
> > > > 1. The algorithm requires heavy synchronization between
> > checkpoint-thread
> > > > and new-approach-rebalance-threads,
> > > > because you need strong guarantees to not start writing or reading to
> > > > chunk which was updated or started reading by the counterpart.
> > > >
> > > > 2. Also, if we have started transferring this chunk in original
> > partition
> > > > couldn't be updated by checkpoint-threads. They should wait for
> > transfer
> > > > finishing.
> > > >
> > > > 3. If sending is slow and partition is updated then in worst case
> > > > checkpoint-threads would create the whole copy of the partition.
> > > >
> > > > So, what we have:
> > > > -on every page write checkpoint-thread should synchronize with
> > > > new-approach-rebalance-threads;
> > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > huge
> > > > as copying the whole partition.
> > > >
> > > >
> > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > ilya.kasnacheev@gmail.com>
> > > > wrote:
> > > >
> > > >> Hello!
> > > >>
> > > >> This proposal will also happily break my compression-with-dictionary
> > patch
> > > >> since it relies currently on only having local dictionaries.
> > > >>
> > > >> However, when you have compressed data, maybe speed boost is even
> > greater
> > > >> with your approach.
> > > >>
> > > >> Regards,
> > > >> --
> > > >> Ilya Kasnacheev
> > > >>
> > > >>
> > > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > > >>
> > > >> > Igniters,
> > > >> >
> > > >> >
> > > >> > I'd like to take the next step of increasing the Apache Ignite with
> > > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > > >> > procedure doesn't utilize the network and storage device throughout
> > to
> > > >> > its full extent even with enough meaningful values of
> > > >> > rebalanceThreadPoolSize property. As part of the previous discussion
> > > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > > >> > idea [3] of transferring cache partition files over the network.
> > > >> > From my point, the case to which this type of rebalancing procedure
> > > >> > can bring the most benefit – is adding a completely new node or set
> > of
> > > >> > new nodes to the cluster. Such a scenario implies fully relocation
> > of
> > > >> > cache partition files to the new node. To roughly estimate the
> > > >> > superiority of partition file transmitting over the network the
> > native
> > > >> > Linux scp\rsync commands can be used. My test environment showed the
> > > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > >> > single-threaded rebalance speed.
> > > >> >
> > > >> >
> > > >> > I've prepared the design document IEP-28 [4] and accumulated all the
> > > >> > process details of a new rebalance approach on that page. Below you
> > > >> > can find the most significant details of the new rebalance procedure
> > > >> > and components of the Apache Ignite which are proposed to change.
> > > >> >
> > > >> > Any feedback is very appreciated.
> > > >> >
> > > >> >
> > > >> > *PROCESS OVERVIEW*
> > > >> >
> > > >> > The whole process is described in terms of rebalancing single cache
> > > >> > group and partition files would be rebalanced one-by-one:
> > > >> >
> > > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > > >> > supplier node;
> > > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > > >> > starts the new checkpoint process;
> > > >> > 3. The supplier node creates empty the temporary cache partition
> > file
> > > >> > with .tmp postfix in the same cache persistence directory;
> > > >> > 4. The supplier node splits the whole cache partition file into
> > > >> > virtual chunks of predefined size (multiply to the PageMemory size);
> > > >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > > >> > cache partition file chunk and tries to flush dirty page to the
> > cache
> > > >> > partition file
> > > >> > 4.1.1. If rebalance chunk already transferred
> > > >> > 4.1.1.1. Flush the dirty page to the file;
> > > >> > 4.1.2. If rebalance chunk not transferred
> > > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > >> > 4.1.2.2. Flush the dirty page to the file;
> > > >> > 4.2. The node starts sending to the demander node each cache
> > partition
> > > >> > file chunk one by one using FileChannel#transferTo
> > > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > > >> > it from the temporary cache partition file;
> > > >> > 4.2.2. If the current chunk is not touched – read it from the
> > original
> > > >> > cache partition file;
> > > >> > 5. The demander node starts to listen to new pipe incoming
> > connections
> > > >> > from the supplier node on TcpCommunicationSpi;
> > > >> > 6. The demander node creates the temporary cache partition file with
> > > >> > .tmp postfix in the same cache persistence directory;
> > > >> > 7. The demander node receives each cache partition file chunk one
> > by one
> > > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> > chunk;
> > > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > > >> > partition file position;
> > > >> > 8. When the demander node receives the whole cache partition file
> > > >> > 8.1. The node initializes received .tmp file as its appropriate
> > cache
> > > >> > partition file;
> > > >> > 8.2. Thread-per-partition begins to apply for data entries from the
> > > >> > beginning of WAL-temporary storage;
> > > >> > 8.3. All async operations corresponding to this partition file still
> > > >> > write to the end of temporary WAL;
> > > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > >> > 8.4.1. Start the first checkpoint;
> > > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> > partition;
> > > >> > 8.4.3. All operations now are switched to the partition file instead
> > > >> > of writing to the temporary WAL;
> > > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > > >> > 9. The supplier node deletes the temporary cache partition file;
> > > >> >
> > > >> >
> > > >> > *COMPONENTS TO CHANGE*
> > > >> >
> > > >> > CommunicationSpi
> > > >> >
> > > >> > To benefit from zero copy we must delegate the file transferring to
> > > >> > FileChannel#transferTo(long, long,
> > > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > > >> > transferTo method is only executed if the destination buffer
> > inherits
> > > >> > from an internal JDK class.
> > > >> >
> > > >> > Preloader
> > > >> >
> > > >> > A new implementation of cache entries preloader assume to be done.
> > The
> > > >> > new implementation must send and receive cache partition files over
> > > >> > the CommunicationSpi channels by chunks of data with validation
> > > >> > received items. The new layer over the cache partition file must
> > > >> > support direct using of FileChannel#transferTo method over the
> > > >> > CommunicationSpi pipe connection. The connection bandwidth of the
> > > >> > cache partition file transfer must have the ability to be limited at
> > > >> > runtime.
> > > >> >
> > > >> > Checkpointer
> > > >> >
> > > >> > When the supplier node receives the cache partition file demand
> > > >> > request it will send the file over the CommunicationSpi. The cache
> > > >> > partition file can be concurrently updated by checkpoint thread
> > during
> > > >> > its transmission. To guarantee the file consistency Сheckpointer
> > must
> > > >> > use copy-on-write technique and save a copy of updated chunk into
> > the
> > > >> > temporary file.
> > > >> >
> > > >> > (new) Catch-up temporary WAL
> > > >> >
> > > >> > While the demander node is in the partition file transmission state
> > it
> > > >> > must save all cache entries corresponding to the moving partition
> > into
> > > >> > a new temporary WAL storage. These entries will be applied later one
> > > >> > by one on the received cache partition file. All asynchronous
> > > >> > operations will be enrolled to the end of temporary WAL storage
> > during
> > > >> > storage reads until it becomes fully read. The file-based FIFO
> > > >> > approach assumes to be used by this process.
> > > >> >
> > > >> >
> > > >> > *RECOVERY*
> > > >> >
> > > >> > In case of crash recovery, there is no additional actions need to be
> > > >> > applied to keep the cache partition file consistency. We are not
> > > >> > recovering partition with the moving state, thus the single
> > partition
> > > >> > file will be lost and only it. The uniqueness of it is guaranteed by
> > > >> > the single-file-transmission process. The cache partition file will
> > be
> > > >> > fully loaded on the next rebalance procedure.
> > > >> >
> > > >> > To provide default cluster recovery guarantee we must to:
> > > >> > 1. Start the checkpoint process when the temporary WAL storage
> > becomes
> > > >> > empty;
> > > >> > 2. Wait for the first checkpoint ends and set owning status to the
> > > >> > cache partition;
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > [1]
> > > >> >
> > > >>
> > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > >> > [2]
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > >> > [4]
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > >> >
> > > >>
> > > >
> >

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Vladimir Ozerov <vo...@gridgain.com>.
Maxim,

Regarding MVCC - this is essentially a copy-on-write approach. New entry is
created on every update. They are cleaned asynchronously by dedicated
threads (aka "vacuum").

I looked at the document you mentioned, thank you for pointing to it. But
it doesn't answer all questions around existing design, and what I am
trying to do is to get how deep do we understand current problems. It is
very true that various subsystems, such as buffer managers, WALs,
supporting sctructures, etc. incur very serious overhead. And when it comes
to heavy operations implementors typically seek for a way to bypass as much
components as possible, taking in count that different shortcuts lead to
different types of side effects. And IMO our very important goal for now is
to create space of possible improvements, and estimate their costs, risks
and applicability for product's configuration space.

Let me claridy several questions about current rebalance implementation, as
I am not a big expert here.
1) Is it correct that supplier sends only one message for every demand
message? If yes, then streaming should improve network utilization a lot.
2) Is it correct that for user caches we process supply messages in a
system pool? Did we consider moving it to striped pool? Because if all
operations on a single partition is ordered, we may apply a number of
critical optimizations - bypassing page cache and checkpointer for new
entries, batched index updates, batched free list updates, etc.
3) Seems that WAL should no longer be a problem for us [1]. What are exact
conditions when it could be disabled on supplier side?
4) Most important - have we tried to profile plain single-threaded
rebalance without concurrent write load? We need to have clear
understanding on where time is spent - supplier/demander, cpu/network/disk,
etc. Some Java tracing code should help.

And one question regarding proposed implementation - how are we going to
handle secondary indexes?

[1] https://issues.apache.org/jira/browse/IGNITE-8017


On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <ma...@gmail.com> wrote:

> Eduard,
>
> Thank you very much for the discussion!
>
> Your algorithm looks much better for me too and easier to implement.
> I'll update appropriate process points on IEP page of the proposed
> rebalance procedure.
> On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> <ed...@gmail.com> wrote:
> >
> > So, after some discussion, I could describe another approach on how to
> > build consistent partition on the fly.
> >
> > 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> > 2. After checkpoint finish, we start sending partition file (without any
> > lock) to the receiver from 0 to fixed size.
> > 3. Next checkpoints if they detect that they would override some pages of
> > transferring file should write the previous state of a page to a
> dedicated
> > file.
> > So, we would have a list of pages written 1 by 1, page id is written in
> the
> > page itself so we could determine page index. Let's name it log.
> > 4. When transfer finished checkpointer would stop updating log-file. Now
> we
> > are ready to send it to the receiver.
> > 5. On receiver side we start merging the dirty partition file with log
> > (updating it with pages from log-file).
> >
> > So, an advantage of this method:
> > - checkpoint-thread work couldn't  increase more than twice;
> > - checkpoint-threads shouldn't wait for anything;
> > - in best case, we receive partition without any extra effort.
> >
> >
> > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > eduard.shangareev@gmail.com> wrote:
> >
> > > Maxim,
> > >
> > > I have looked through your algorithm of reading partition consistently.
> > > And I have some questions/comments.
> > >
> > > 1. The algorithm requires heavy synchronization between
> checkpoint-thread
> > > and new-approach-rebalance-threads,
> > > because you need strong guarantees to not start writing or reading to
> > > chunk which was updated or started reading by the counterpart.
> > >
> > > 2. Also, if we have started transferring this chunk in original
> partition
> > > couldn't be updated by checkpoint-threads. They should wait for
> transfer
> > > finishing.
> > >
> > > 3. If sending is slow and partition is updated then in worst case
> > > checkpoint-threads would create the whole copy of the partition.
> > >
> > > So, what we have:
> > > -on every page write checkpoint-thread should synchronize with
> > > new-approach-rebalance-threads;
> > > -checkpoint-thread should do extra-work, sometimes this could be as
> huge
> > > as copying the whole partition.
> > >
> > >
> > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> ilya.kasnacheev@gmail.com>
> > > wrote:
> > >
> > >> Hello!
> > >>
> > >> This proposal will also happily break my compression-with-dictionary
> patch
> > >> since it relies currently on only having local dictionaries.
> > >>
> > >> However, when you have compressed data, maybe speed boost is even
> greater
> > >> with your approach.
> > >>
> > >> Regards,
> > >> --
> > >> Ilya Kasnacheev
> > >>
> > >>
> > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> > >>
> > >> > Igniters,
> > >> >
> > >> >
> > >> > I'd like to take the next step of increasing the Apache Ignite with
> > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > >> > procedure doesn't utilize the network and storage device throughout
> to
> > >> > its full extent even with enough meaningful values of
> > >> > rebalanceThreadPoolSize property. As part of the previous discussion
> > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > >> > idea [3] of transferring cache partition files over the network.
> > >> > From my point, the case to which this type of rebalancing procedure
> > >> > can bring the most benefit – is adding a completely new node or set
> of
> > >> > new nodes to the cluster. Such a scenario implies fully relocation
> of
> > >> > cache partition files to the new node. To roughly estimate the
> > >> > superiority of partition file transmitting over the network the
> native
> > >> > Linux scp\rsync commands can be used. My test environment showed the
> > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > >> > single-threaded rebalance speed.
> > >> >
> > >> >
> > >> > I've prepared the design document IEP-28 [4] and accumulated all the
> > >> > process details of a new rebalance approach on that page. Below you
> > >> > can find the most significant details of the new rebalance procedure
> > >> > and components of the Apache Ignite which are proposed to change.
> > >> >
> > >> > Any feedback is very appreciated.
> > >> >
> > >> >
> > >> > *PROCESS OVERVIEW*
> > >> >
> > >> > The whole process is described in terms of rebalancing single cache
> > >> > group and partition files would be rebalanced one-by-one:
> > >> >
> > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > >> > supplier node;
> > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > >> > starts the new checkpoint process;
> > >> > 3. The supplier node creates empty the temporary cache partition
> file
> > >> > with .tmp postfix in the same cache persistence directory;
> > >> > 4. The supplier node splits the whole cache partition file into
> > >> > virtual chunks of predefined size (multiply to the PageMemory size);
> > >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > >> > cache partition file chunk and tries to flush dirty page to the
> cache
> > >> > partition file
> > >> > 4.1.1. If rebalance chunk already transferred
> > >> > 4.1.1.1. Flush the dirty page to the file;
> > >> > 4.1.2. If rebalance chunk not transferred
> > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > >> > 4.1.2.2. Flush the dirty page to the file;
> > >> > 4.2. The node starts sending to the demander node each cache
> partition
> > >> > file chunk one by one using FileChannel#transferTo
> > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > >> > it from the temporary cache partition file;
> > >> > 4.2.2. If the current chunk is not touched – read it from the
> original
> > >> > cache partition file;
> > >> > 5. The demander node starts to listen to new pipe incoming
> connections
> > >> > from the supplier node on TcpCommunicationSpi;
> > >> > 6. The demander node creates the temporary cache partition file with
> > >> > .tmp postfix in the same cache persistence directory;
> > >> > 7. The demander node receives each cache partition file chunk one
> by one
> > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> chunk;
> > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > >> > partition file position;
> > >> > 8. When the demander node receives the whole cache partition file
> > >> > 8.1. The node initializes received .tmp file as its appropriate
> cache
> > >> > partition file;
> > >> > 8.2. Thread-per-partition begins to apply for data entries from the
> > >> > beginning of WAL-temporary storage;
> > >> > 8.3. All async operations corresponding to this partition file still
> > >> > write to the end of temporary WAL;
> > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > >> > 8.4.1. Start the first checkpoint;
> > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> partition;
> > >> > 8.4.3. All operations now are switched to the partition file instead
> > >> > of writing to the temporary WAL;
> > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > >> > 9. The supplier node deletes the temporary cache partition file;
> > >> >
> > >> >
> > >> > *COMPONENTS TO CHANGE*
> > >> >
> > >> > CommunicationSpi
> > >> >
> > >> > To benefit from zero copy we must delegate the file transferring to
> > >> > FileChannel#transferTo(long, long,
> > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > >> > transferTo method is only executed if the destination buffer
> inherits
> > >> > from an internal JDK class.
> > >> >
> > >> > Preloader
> > >> >
> > >> > A new implementation of cache entries preloader assume to be done.
> The
> > >> > new implementation must send and receive cache partition files over
> > >> > the CommunicationSpi channels by chunks of data with validation
> > >> > received items. The new layer over the cache partition file must
> > >> > support direct using of FileChannel#transferTo method over the
> > >> > CommunicationSpi pipe connection. The connection bandwidth of the
> > >> > cache partition file transfer must have the ability to be limited at
> > >> > runtime.
> > >> >
> > >> > Checkpointer
> > >> >
> > >> > When the supplier node receives the cache partition file demand
> > >> > request it will send the file over the CommunicationSpi. The cache
> > >> > partition file can be concurrently updated by checkpoint thread
> during
> > >> > its transmission. To guarantee the file consistency Сheckpointer
> must
> > >> > use copy-on-write technique and save a copy of updated chunk into
> the
> > >> > temporary file.
> > >> >
> > >> > (new) Catch-up temporary WAL
> > >> >
> > >> > While the demander node is in the partition file transmission state
> it
> > >> > must save all cache entries corresponding to the moving partition
> into
> > >> > a new temporary WAL storage. These entries will be applied later one
> > >> > by one on the received cache partition file. All asynchronous
> > >> > operations will be enrolled to the end of temporary WAL storage
> during
> > >> > storage reads until it becomes fully read. The file-based FIFO
> > >> > approach assumes to be used by this process.
> > >> >
> > >> >
> > >> > *RECOVERY*
> > >> >
> > >> > In case of crash recovery, there is no additional actions need to be
> > >> > applied to keep the cache partition file consistency. We are not
> > >> > recovering partition with the moving state, thus the single
> partition
> > >> > file will be lost and only it. The uniqueness of it is guaranteed by
> > >> > the single-file-transmission process. The cache partition file will
> be
> > >> > fully loaded on the next rebalance procedure.
> > >> >
> > >> > To provide default cluster recovery guarantee we must to:
> > >> > 1. Start the checkpoint process when the temporary WAL storage
> becomes
> > >> > empty;
> > >> > 2. Wait for the first checkpoint ends and set owning status to the
> > >> > cache partition;
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > [1]
> > >> >
> > >>
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > >> > [2]
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > >> > [4]
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > >> >
> > >>
> > >
>

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Maxim Muzafarov <ma...@gmail.com>.
Eduard,

Thank you very much for the discussion!

Your algorithm looks much better for me too and easier to implement.
I'll update appropriate process points on IEP page of the proposed
rebalance procedure.
On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
<ed...@gmail.com> wrote:
>
> So, after some discussion, I could describe another approach on how to
> build consistent partition on the fly.
>
> 1. We make a checkpoint, fix the size of the partition in OffheapManager.
> 2. After checkpoint finish, we start sending partition file (without any
> lock) to the receiver from 0 to fixed size.
> 3. Next checkpoints if they detect that they would override some pages of
> transferring file should write the previous state of a page to a dedicated
> file.
> So, we would have a list of pages written 1 by 1, page id is written in the
> page itself so we could determine page index. Let's name it log.
> 4. When transfer finished checkpointer would stop updating log-file. Now we
> are ready to send it to the receiver.
> 5. On receiver side we start merging the dirty partition file with log
> (updating it with pages from log-file).
>
> So, an advantage of this method:
> - checkpoint-thread work couldn't  increase more than twice;
> - checkpoint-threads shouldn't wait for anything;
> - in best case, we receive partition without any extra effort.
>
>
> On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> eduard.shangareev@gmail.com> wrote:
>
> > Maxim,
> >
> > I have looked through your algorithm of reading partition consistently.
> > And I have some questions/comments.
> >
> > 1. The algorithm requires heavy synchronization between checkpoint-thread
> > and new-approach-rebalance-threads,
> > because you need strong guarantees to not start writing or reading to
> > chunk which was updated or started reading by the counterpart.
> >
> > 2. Also, if we have started transferring this chunk in original partition
> > couldn't be updated by checkpoint-threads. They should wait for transfer
> > finishing.
> >
> > 3. If sending is slow and partition is updated then in worst case
> > checkpoint-threads would create the whole copy of the partition.
> >
> > So, what we have:
> > -on every page write checkpoint-thread should synchronize with
> > new-approach-rebalance-threads;
> > -checkpoint-thread should do extra-work, sometimes this could be as huge
> > as copying the whole partition.
> >
> >
> > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <il...@gmail.com>
> > wrote:
> >
> >> Hello!
> >>
> >> This proposal will also happily break my compression-with-dictionary patch
> >> since it relies currently on only having local dictionaries.
> >>
> >> However, when you have compressed data, maybe speed boost is even greater
> >> with your approach.
> >>
> >> Regards,
> >> --
> >> Ilya Kasnacheev
> >>
> >>
> >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
> >>
> >> > Igniters,
> >> >
> >> >
> >> > I'd like to take the next step of increasing the Apache Ignite with
> >> > enabled persistence rebalance speed. Currently, the rebalancing
> >> > procedure doesn't utilize the network and storage device throughout to
> >> > its full extent even with enough meaningful values of
> >> > rebalanceThreadPoolSize property. As part of the previous discussion
> >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> >> > idea [3] of transferring cache partition files over the network.
> >> > From my point, the case to which this type of rebalancing procedure
> >> > can bring the most benefit – is adding a completely new node or set of
> >> > new nodes to the cluster. Such a scenario implies fully relocation of
> >> > cache partition files to the new node. To roughly estimate the
> >> > superiority of partition file transmitting over the network the native
> >> > Linux scp\rsync commands can be used. My test environment showed the
> >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> >> > single-threaded rebalance speed.
> >> >
> >> >
> >> > I've prepared the design document IEP-28 [4] and accumulated all the
> >> > process details of a new rebalance approach on that page. Below you
> >> > can find the most significant details of the new rebalance procedure
> >> > and components of the Apache Ignite which are proposed to change.
> >> >
> >> > Any feedback is very appreciated.
> >> >
> >> >
> >> > *PROCESS OVERVIEW*
> >> >
> >> > The whole process is described in terms of rebalancing single cache
> >> > group and partition files would be rebalanced one-by-one:
> >> >
> >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> >> > supplier node;
> >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> >> > starts the new checkpoint process;
> >> > 3. The supplier node creates empty the temporary cache partition file
> >> > with .tmp postfix in the same cache persistence directory;
> >> > 4. The supplier node splits the whole cache partition file into
> >> > virtual chunks of predefined size (multiply to the PageMemory size);
> >> > 4.1. If the concurrent checkpoint thread determines the appropriate
> >> > cache partition file chunk and tries to flush dirty page to the cache
> >> > partition file
> >> > 4.1.1. If rebalance chunk already transferred
> >> > 4.1.1.1. Flush the dirty page to the file;
> >> > 4.1.2. If rebalance chunk not transferred
> >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> >> > 4.1.2.2. Flush the dirty page to the file;
> >> > 4.2. The node starts sending to the demander node each cache partition
> >> > file chunk one by one using FileChannel#transferTo
> >> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> >> > it from the temporary cache partition file;
> >> > 4.2.2. If the current chunk is not touched – read it from the original
> >> > cache partition file;
> >> > 5. The demander node starts to listen to new pipe incoming connections
> >> > from the supplier node on TcpCommunicationSpi;
> >> > 6. The demander node creates the temporary cache partition file with
> >> > .tmp postfix in the same cache persistence directory;
> >> > 7. The demander node receives each cache partition file chunk one by one
> >> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
> >> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> >> > partition file position;
> >> > 8. When the demander node receives the whole cache partition file
> >> > 8.1. The node initializes received .tmp file as its appropriate cache
> >> > partition file;
> >> > 8.2. Thread-per-partition begins to apply for data entries from the
> >> > beginning of WAL-temporary storage;
> >> > 8.3. All async operations corresponding to this partition file still
> >> > write to the end of temporary WAL;
> >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> >> > 8.4.1. Start the first checkpoint;
> >> > 8.4.2. Wait for the first checkpoint ends and own the cache partition;
> >> > 8.4.3. All operations now are switched to the partition file instead
> >> > of writing to the temporary WAL;
> >> > 8.4.4. Schedule the temporary WAL storage deletion;
> >> > 9. The supplier node deletes the temporary cache partition file;
> >> >
> >> >
> >> > *COMPONENTS TO CHANGE*
> >> >
> >> > CommunicationSpi
> >> >
> >> > To benefit from zero copy we must delegate the file transferring to
> >> > FileChannel#transferTo(long, long,
> >> > java.nio.channels.WritableByteChannel) because the fast path of
> >> > transferTo method is only executed if the destination buffer inherits
> >> > from an internal JDK class.
> >> >
> >> > Preloader
> >> >
> >> > A new implementation of cache entries preloader assume to be done. The
> >> > new implementation must send and receive cache partition files over
> >> > the CommunicationSpi channels by chunks of data with validation
> >> > received items. The new layer over the cache partition file must
> >> > support direct using of FileChannel#transferTo method over the
> >> > CommunicationSpi pipe connection. The connection bandwidth of the
> >> > cache partition file transfer must have the ability to be limited at
> >> > runtime.
> >> >
> >> > Checkpointer
> >> >
> >> > When the supplier node receives the cache partition file demand
> >> > request it will send the file over the CommunicationSpi. The cache
> >> > partition file can be concurrently updated by checkpoint thread during
> >> > its transmission. To guarantee the file consistency Сheckpointer must
> >> > use copy-on-write technique and save a copy of updated chunk into the
> >> > temporary file.
> >> >
> >> > (new) Catch-up temporary WAL
> >> >
> >> > While the demander node is in the partition file transmission state it
> >> > must save all cache entries corresponding to the moving partition into
> >> > a new temporary WAL storage. These entries will be applied later one
> >> > by one on the received cache partition file. All asynchronous
> >> > operations will be enrolled to the end of temporary WAL storage during
> >> > storage reads until it becomes fully read. The file-based FIFO
> >> > approach assumes to be used by this process.
> >> >
> >> >
> >> > *RECOVERY*
> >> >
> >> > In case of crash recovery, there is no additional actions need to be
> >> > applied to keep the cache partition file consistency. We are not
> >> > recovering partition with the moving state, thus the single partition
> >> > file will be lost and only it. The uniqueness of it is guaranteed by
> >> > the single-file-transmission process. The cache partition file will be
> >> > fully loaded on the next rebalance procedure.
> >> >
> >> > To provide default cluster recovery guarantee we must to:
> >> > 1. Start the checkpoint process when the temporary WAL storage becomes
> >> > empty;
> >> > 2. Wait for the first checkpoint ends and set owning status to the
> >> > cache partition;
> >> >
> >> >
> >> >
> >> >
> >> > [1]
> >> >
> >> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> >> > [2]
> >> >
> >> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> >> > [4]
> >> >
> >> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> >> >
> >>
> >

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Eduard Shangareev <ed...@gmail.com>.
So, after some discussion, I could describe another approach on how to
build consistent partition on the fly.

1. We make a checkpoint, fix the size of the partition in OffheapManager.
2. After checkpoint finish, we start sending partition file (without any
lock) to the receiver from 0 to fixed size.
3. Next checkpoints if they detect that they would override some pages of
transferring file should write the previous state of a page to a dedicated
file.
So, we would have a list of pages written 1 by 1, page id is written in the
page itself so we could determine page index. Let's name it log.
4. When transfer finished checkpointer would stop updating log-file. Now we
are ready to send it to the receiver.
5. On receiver side we start merging the dirty partition file with log
(updating it with pages from log-file).

So, an advantage of this method:
- checkpoint-thread work couldn't  increase more than twice;
- checkpoint-threads shouldn't wait for anything;
- in best case, we receive partition without any extra effort.


On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
eduard.shangareev@gmail.com> wrote:

> Maxim,
>
> I have looked through your algorithm of reading partition consistently.
> And I have some questions/comments.
>
> 1. The algorithm requires heavy synchronization between checkpoint-thread
> and new-approach-rebalance-threads,
> because you need strong guarantees to not start writing or reading to
> chunk which was updated or started reading by the counterpart.
>
> 2. Also, if we have started transferring this chunk in original partition
> couldn't be updated by checkpoint-threads. They should wait for transfer
> finishing.
>
> 3. If sending is slow and partition is updated then in worst case
> checkpoint-threads would create the whole copy of the partition.
>
> So, what we have:
> -on every page write checkpoint-thread should synchronize with
> new-approach-rebalance-threads;
> -checkpoint-thread should do extra-work, sometimes this could be as huge
> as copying the whole partition.
>
>
> On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <il...@gmail.com>
> wrote:
>
>> Hello!
>>
>> This proposal will also happily break my compression-with-dictionary patch
>> since it relies currently on only having local dictionaries.
>>
>> However, when you have compressed data, maybe speed boost is even greater
>> with your approach.
>>
>> Regards,
>> --
>> Ilya Kasnacheev
>>
>>
>> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
>>
>> > Igniters,
>> >
>> >
>> > I'd like to take the next step of increasing the Apache Ignite with
>> > enabled persistence rebalance speed. Currently, the rebalancing
>> > procedure doesn't utilize the network and storage device throughout to
>> > its full extent even with enough meaningful values of
>> > rebalanceThreadPoolSize property. As part of the previous discussion
>> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
>> > idea [3] of transferring cache partition files over the network.
>> > From my point, the case to which this type of rebalancing procedure
>> > can bring the most benefit – is adding a completely new node or set of
>> > new nodes to the cluster. Such a scenario implies fully relocation of
>> > cache partition files to the new node. To roughly estimate the
>> > superiority of partition file transmitting over the network the native
>> > Linux scp\rsync commands can be used. My test environment showed the
>> > result of the new approach as 270 MB/s vs the current 40 MB/s
>> > single-threaded rebalance speed.
>> >
>> >
>> > I've prepared the design document IEP-28 [4] and accumulated all the
>> > process details of a new rebalance approach on that page. Below you
>> > can find the most significant details of the new rebalance procedure
>> > and components of the Apache Ignite which are proposed to change.
>> >
>> > Any feedback is very appreciated.
>> >
>> >
>> > *PROCESS OVERVIEW*
>> >
>> > The whole process is described in terms of rebalancing single cache
>> > group and partition files would be rebalanced one-by-one:
>> >
>> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
>> > supplier node;
>> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
>> > starts the new checkpoint process;
>> > 3. The supplier node creates empty the temporary cache partition file
>> > with .tmp postfix in the same cache persistence directory;
>> > 4. The supplier node splits the whole cache partition file into
>> > virtual chunks of predefined size (multiply to the PageMemory size);
>> > 4.1. If the concurrent checkpoint thread determines the appropriate
>> > cache partition file chunk and tries to flush dirty page to the cache
>> > partition file
>> > 4.1.1. If rebalance chunk already transferred
>> > 4.1.1.1. Flush the dirty page to the file;
>> > 4.1.2. If rebalance chunk not transferred
>> > 4.1.2.1. Write this chunk to the temporary cache partition file;
>> > 4.1.2.2. Flush the dirty page to the file;
>> > 4.2. The node starts sending to the demander node each cache partition
>> > file chunk one by one using FileChannel#transferTo
>> > 4.2.1. If the current chunk was modified by checkpoint thread – read
>> > it from the temporary cache partition file;
>> > 4.2.2. If the current chunk is not touched – read it from the original
>> > cache partition file;
>> > 5. The demander node starts to listen to new pipe incoming connections
>> > from the supplier node on TcpCommunicationSpi;
>> > 6. The demander node creates the temporary cache partition file with
>> > .tmp postfix in the same cache persistence directory;
>> > 7. The demander node receives each cache partition file chunk one by one
>> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
>> > 7.2. The node flushes the downloaded chunk at the appropriate cache
>> > partition file position;
>> > 8. When the demander node receives the whole cache partition file
>> > 8.1. The node initializes received .tmp file as its appropriate cache
>> > partition file;
>> > 8.2. Thread-per-partition begins to apply for data entries from the
>> > beginning of WAL-temporary storage;
>> > 8.3. All async operations corresponding to this partition file still
>> > write to the end of temporary WAL;
>> > 8.4. At the moment of WAL-temporary storage is ready to be empty
>> > 8.4.1. Start the first checkpoint;
>> > 8.4.2. Wait for the first checkpoint ends and own the cache partition;
>> > 8.4.3. All operations now are switched to the partition file instead
>> > of writing to the temporary WAL;
>> > 8.4.4. Schedule the temporary WAL storage deletion;
>> > 9. The supplier node deletes the temporary cache partition file;
>> >
>> >
>> > *COMPONENTS TO CHANGE*
>> >
>> > CommunicationSpi
>> >
>> > To benefit from zero copy we must delegate the file transferring to
>> > FileChannel#transferTo(long, long,
>> > java.nio.channels.WritableByteChannel) because the fast path of
>> > transferTo method is only executed if the destination buffer inherits
>> > from an internal JDK class.
>> >
>> > Preloader
>> >
>> > A new implementation of cache entries preloader assume to be done. The
>> > new implementation must send and receive cache partition files over
>> > the CommunicationSpi channels by chunks of data with validation
>> > received items. The new layer over the cache partition file must
>> > support direct using of FileChannel#transferTo method over the
>> > CommunicationSpi pipe connection. The connection bandwidth of the
>> > cache partition file transfer must have the ability to be limited at
>> > runtime.
>> >
>> > Checkpointer
>> >
>> > When the supplier node receives the cache partition file demand
>> > request it will send the file over the CommunicationSpi. The cache
>> > partition file can be concurrently updated by checkpoint thread during
>> > its transmission. To guarantee the file consistency Сheckpointer must
>> > use copy-on-write technique and save a copy of updated chunk into the
>> > temporary file.
>> >
>> > (new) Catch-up temporary WAL
>> >
>> > While the demander node is in the partition file transmission state it
>> > must save all cache entries corresponding to the moving partition into
>> > a new temporary WAL storage. These entries will be applied later one
>> > by one on the received cache partition file. All asynchronous
>> > operations will be enrolled to the end of temporary WAL storage during
>> > storage reads until it becomes fully read. The file-based FIFO
>> > approach assumes to be used by this process.
>> >
>> >
>> > *RECOVERY*
>> >
>> > In case of crash recovery, there is no additional actions need to be
>> > applied to keep the cache partition file consistency. We are not
>> > recovering partition with the moving state, thus the single partition
>> > file will be lost and only it. The uniqueness of it is guaranteed by
>> > the single-file-transmission process. The cache partition file will be
>> > fully loaded on the next rebalance procedure.
>> >
>> > To provide default cluster recovery guarantee we must to:
>> > 1. Start the checkpoint process when the temporary WAL storage becomes
>> > empty;
>> > 2. Wait for the first checkpoint ends and set owning status to the
>> > cache partition;
>> >
>> >
>> >
>> >
>> > [1]
>> >
>> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
>> > [2]
>> >
>> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
>> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
>> > [4]
>> >
>> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
>> >
>>
>

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Eduard Shangareev <ed...@gmail.com>.
Maxim,

I have looked through your algorithm of reading partition consistently.
And I have some questions/comments.

1. The algorithm requires heavy synchronization between checkpoint-thread
and new-approach-rebalance-threads,
because you need strong guarantees to not start writing or reading to chunk
which was updated or started reading by the counterpart.

2. Also, if we have started transferring this chunk in original partition
couldn't be updated by checkpoint-threads. They should wait for transfer
finishing.

3. If sending is slow and partition is updated then in worst case
checkpoint-threads would create the whole copy of the partition.

So, what we have:
-on every page write checkpoint-thread should synchronize with
new-approach-rebalance-threads;
-checkpoint-thread should do extra-work, sometimes this could be as huge as
copying the whole partition.


On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <il...@gmail.com>
wrote:

> Hello!
>
> This proposal will also happily break my compression-with-dictionary patch
> since it relies currently on only having local dictionaries.
>
> However, when you have compressed data, maybe speed boost is even greater
> with your approach.
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:
>
> > Igniters,
> >
> >
> > I'd like to take the next step of increasing the Apache Ignite with
> > enabled persistence rebalance speed. Currently, the rebalancing
> > procedure doesn't utilize the network and storage device throughout to
> > its full extent even with enough meaningful values of
> > rebalanceThreadPoolSize property. As part of the previous discussion
> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > idea [3] of transferring cache partition files over the network.
> > From my point, the case to which this type of rebalancing procedure
> > can bring the most benefit – is adding a completely new node or set of
> > new nodes to the cluster. Such a scenario implies fully relocation of
> > cache partition files to the new node. To roughly estimate the
> > superiority of partition file transmitting over the network the native
> > Linux scp\rsync commands can be used. My test environment showed the
> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > single-threaded rebalance speed.
> >
> >
> > I've prepared the design document IEP-28 [4] and accumulated all the
> > process details of a new rebalance approach on that page. Below you
> > can find the most significant details of the new rebalance procedure
> > and components of the Apache Ignite which are proposed to change.
> >
> > Any feedback is very appreciated.
> >
> >
> > *PROCESS OVERVIEW*
> >
> > The whole process is described in terms of rebalancing single cache
> > group and partition files would be rebalanced one-by-one:
> >
> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > supplier node;
> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > starts the new checkpoint process;
> > 3. The supplier node creates empty the temporary cache partition file
> > with .tmp postfix in the same cache persistence directory;
> > 4. The supplier node splits the whole cache partition file into
> > virtual chunks of predefined size (multiply to the PageMemory size);
> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > cache partition file chunk and tries to flush dirty page to the cache
> > partition file
> > 4.1.1. If rebalance chunk already transferred
> > 4.1.1.1. Flush the dirty page to the file;
> > 4.1.2. If rebalance chunk not transferred
> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > 4.1.2.2. Flush the dirty page to the file;
> > 4.2. The node starts sending to the demander node each cache partition
> > file chunk one by one using FileChannel#transferTo
> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > it from the temporary cache partition file;
> > 4.2.2. If the current chunk is not touched – read it from the original
> > cache partition file;
> > 5. The demander node starts to listen to new pipe incoming connections
> > from the supplier node on TcpCommunicationSpi;
> > 6. The demander node creates the temporary cache partition file with
> > .tmp postfix in the same cache persistence directory;
> > 7. The demander node receives each cache partition file chunk one by one
> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > partition file position;
> > 8. When the demander node receives the whole cache partition file
> > 8.1. The node initializes received .tmp file as its appropriate cache
> > partition file;
> > 8.2. Thread-per-partition begins to apply for data entries from the
> > beginning of WAL-temporary storage;
> > 8.3. All async operations corresponding to this partition file still
> > write to the end of temporary WAL;
> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > 8.4.1. Start the first checkpoint;
> > 8.4.2. Wait for the first checkpoint ends and own the cache partition;
> > 8.4.3. All operations now are switched to the partition file instead
> > of writing to the temporary WAL;
> > 8.4.4. Schedule the temporary WAL storage deletion;
> > 9. The supplier node deletes the temporary cache partition file;
> >
> >
> > *COMPONENTS TO CHANGE*
> >
> > CommunicationSpi
> >
> > To benefit from zero copy we must delegate the file transferring to
> > FileChannel#transferTo(long, long,
> > java.nio.channels.WritableByteChannel) because the fast path of
> > transferTo method is only executed if the destination buffer inherits
> > from an internal JDK class.
> >
> > Preloader
> >
> > A new implementation of cache entries preloader assume to be done. The
> > new implementation must send and receive cache partition files over
> > the CommunicationSpi channels by chunks of data with validation
> > received items. The new layer over the cache partition file must
> > support direct using of FileChannel#transferTo method over the
> > CommunicationSpi pipe connection. The connection bandwidth of the
> > cache partition file transfer must have the ability to be limited at
> > runtime.
> >
> > Checkpointer
> >
> > When the supplier node receives the cache partition file demand
> > request it will send the file over the CommunicationSpi. The cache
> > partition file can be concurrently updated by checkpoint thread during
> > its transmission. To guarantee the file consistency Сheckpointer must
> > use copy-on-write technique and save a copy of updated chunk into the
> > temporary file.
> >
> > (new) Catch-up temporary WAL
> >
> > While the demander node is in the partition file transmission state it
> > must save all cache entries corresponding to the moving partition into
> > a new temporary WAL storage. These entries will be applied later one
> > by one on the received cache partition file. All asynchronous
> > operations will be enrolled to the end of temporary WAL storage during
> > storage reads until it becomes fully read. The file-based FIFO
> > approach assumes to be used by this process.
> >
> >
> > *RECOVERY*
> >
> > In case of crash recovery, there is no additional actions need to be
> > applied to keep the cache partition file consistency. We are not
> > recovering partition with the moving state, thus the single partition
> > file will be lost and only it. The uniqueness of it is guaranteed by
> > the single-file-transmission process. The cache partition file will be
> > fully loaded on the next rebalance procedure.
> >
> > To provide default cluster recovery guarantee we must to:
> > 1. Start the checkpoint process when the temporary WAL storage becomes
> > empty;
> > 2. Wait for the first checkpoint ends and set owning status to the
> > cache partition;
> >
> >
> >
> >
> > [1]
> >
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > [2]
> >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > [4]
> >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> >
>

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!

This proposal will also happily break my compression-with-dictionary patch
since it relies currently on only having local dictionaries.

However, when you have compressed data, maybe speed boost is even greater
with your approach.

Regards,
-- 
Ilya Kasnacheev


пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <ma...@gmail.com>:

> Igniters,
>
>
> I'd like to take the next step of increasing the Apache Ignite with
> enabled persistence rebalance speed. Currently, the rebalancing
> procedure doesn't utilize the network and storage device throughout to
> its full extent even with enough meaningful values of
> rebalanceThreadPoolSize property. As part of the previous discussion
> `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> idea [3] of transferring cache partition files over the network.
> From my point, the case to which this type of rebalancing procedure
> can bring the most benefit – is adding a completely new node or set of
> new nodes to the cluster. Such a scenario implies fully relocation of
> cache partition files to the new node. To roughly estimate the
> superiority of partition file transmitting over the network the native
> Linux scp\rsync commands can be used. My test environment showed the
> result of the new approach as 270 MB/s vs the current 40 MB/s
> single-threaded rebalance speed.
>
>
> I've prepared the design document IEP-28 [4] and accumulated all the
> process details of a new rebalance approach on that page. Below you
> can find the most significant details of the new rebalance procedure
> and components of the Apache Ignite which are proposed to change.
>
> Any feedback is very appreciated.
>
>
> *PROCESS OVERVIEW*
>
> The whole process is described in terms of rebalancing single cache
> group and partition files would be rebalanced one-by-one:
>
> 1. The demander node sends the GridDhtPartitionDemandMessage to the
> supplier node;
> 2. When the supplier node receives GridDhtPartitionDemandMessage and
> starts the new checkpoint process;
> 3. The supplier node creates empty the temporary cache partition file
> with .tmp postfix in the same cache persistence directory;
> 4. The supplier node splits the whole cache partition file into
> virtual chunks of predefined size (multiply to the PageMemory size);
> 4.1. If the concurrent checkpoint thread determines the appropriate
> cache partition file chunk and tries to flush dirty page to the cache
> partition file
> 4.1.1. If rebalance chunk already transferred
> 4.1.1.1. Flush the dirty page to the file;
> 4.1.2. If rebalance chunk not transferred
> 4.1.2.1. Write this chunk to the temporary cache partition file;
> 4.1.2.2. Flush the dirty page to the file;
> 4.2. The node starts sending to the demander node each cache partition
> file chunk one by one using FileChannel#transferTo
> 4.2.1. If the current chunk was modified by checkpoint thread – read
> it from the temporary cache partition file;
> 4.2.2. If the current chunk is not touched – read it from the original
> cache partition file;
> 5. The demander node starts to listen to new pipe incoming connections
> from the supplier node on TcpCommunicationSpi;
> 6. The demander node creates the temporary cache partition file with
> .tmp postfix in the same cache persistence directory;
> 7. The demander node receives each cache partition file chunk one by one
> 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
> 7.2. The node flushes the downloaded chunk at the appropriate cache
> partition file position;
> 8. When the demander node receives the whole cache partition file
> 8.1. The node initializes received .tmp file as its appropriate cache
> partition file;
> 8.2. Thread-per-partition begins to apply for data entries from the
> beginning of WAL-temporary storage;
> 8.3. All async operations corresponding to this partition file still
> write to the end of temporary WAL;
> 8.4. At the moment of WAL-temporary storage is ready to be empty
> 8.4.1. Start the first checkpoint;
> 8.4.2. Wait for the first checkpoint ends and own the cache partition;
> 8.4.3. All operations now are switched to the partition file instead
> of writing to the temporary WAL;
> 8.4.4. Schedule the temporary WAL storage deletion;
> 9. The supplier node deletes the temporary cache partition file;
>
>
> *COMPONENTS TO CHANGE*
>
> CommunicationSpi
>
> To benefit from zero copy we must delegate the file transferring to
> FileChannel#transferTo(long, long,
> java.nio.channels.WritableByteChannel) because the fast path of
> transferTo method is only executed if the destination buffer inherits
> from an internal JDK class.
>
> Preloader
>
> A new implementation of cache entries preloader assume to be done. The
> new implementation must send and receive cache partition files over
> the CommunicationSpi channels by chunks of data with validation
> received items. The new layer over the cache partition file must
> support direct using of FileChannel#transferTo method over the
> CommunicationSpi pipe connection. The connection bandwidth of the
> cache partition file transfer must have the ability to be limited at
> runtime.
>
> Checkpointer
>
> When the supplier node receives the cache partition file demand
> request it will send the file over the CommunicationSpi. The cache
> partition file can be concurrently updated by checkpoint thread during
> its transmission. To guarantee the file consistency Сheckpointer must
> use copy-on-write technique and save a copy of updated chunk into the
> temporary file.
>
> (new) Catch-up temporary WAL
>
> While the demander node is in the partition file transmission state it
> must save all cache entries corresponding to the moving partition into
> a new temporary WAL storage. These entries will be applied later one
> by one on the received cache partition file. All asynchronous
> operations will be enrolled to the end of temporary WAL storage during
> storage reads until it becomes fully read. The file-based FIFO
> approach assumes to be used by this process.
>
>
> *RECOVERY*
>
> In case of crash recovery, there is no additional actions need to be
> applied to keep the cache partition file consistency. We are not
> recovering partition with the moving state, thus the single partition
> file will be lost and only it. The uniqueness of it is guaranteed by
> the single-file-transmission process. The cache partition file will be
> fully loaded on the next rebalance procedure.
>
> To provide default cluster recovery guarantee we must to:
> 1. Start the checkpoint process when the temporary WAL storage becomes
> empty;
> 2. Wait for the first checkpoint ends and set owning status to the
> cache partition;
>
>
>
>
> [1]
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> [2]
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> [3] https://issues.apache.org/jira/browse/IGNITE-8020
> [4]
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
>

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Maxim Muzafarov <ma...@gmail.com>.
Vladimir,


> Another thing to consider is MVCC - under load partitions may have a lot of entries which are not visible to anyone and is about to be removed, so there is no need to transfer them at all.

Please share more details about how these entries occur. When they
become invisible? From what I've found, they appear after entry
deletion, right? It looks that when checkpoint ends these entries will
be gone, but I can mistake here. I'll try to take it into account and
will update the IEP page.


> Do we have any analysis on where time is really spent during current rebalance implementation?

Yes, I've collected some statistics about the rebalancing procedure
and also I've tested it with different types of available rebalance
properties. The wiki page [1] of current rebalancing procedure
limitations and advantages of the new one was created by me. I have
not published yet everything measurements that I have, but, please,
look at the graph placed on that page. We have higher CPU consumption
on the demander node rather than on the supplier node. This is all
without any additional load. I think it shows us that saving entries
one by one is not the right strategy for the cache data balancing.
Therefore, I think we have some options here (you already mentioned
some of them): a batch entries processing, optimization internal data
structures, or avoid it at all by transferring stored files. We
already have tickets for the fuzzy free list implementation [2] and
the batch entries processing [3]. At that time in the past and now
these changes looks to me more complex and risky (maybe I'm missing
something and they are easier). I think it's acceptable to start (see
the next comment - why) the cluster rebalancing procedure optimization
from persistence enabled perspective by prototyping proposed approach.


> But it is rather complex, doesn't help in-memory mode, and will not work for other storage formats which we may potentially have in future (e.g. tablespace approach).

You are not actually right here. Yes, this proposal is only for
clusters with enabled persistence, but don't consider these changes as
a huge monolithic update. From my point, It's a set of independent
features that will give Apache Ignite a competitive advantage. For
instance, changes in Chekpointer will give us an opportunity to save
(over the network or a direct copy to some file) data snapshots of
persisted files under checkpoint at some point in time. Or another
example, changes in CommunicationSpi will allow us to have a channel
connection between any pair of nodes for any needs (e.g. copying any
files using zero-copy algorithm without node CPU resources consumption
or any binary data as well).
I've read your topic about remaining cache groups to the tablespace
and I very like this idea. I can say that the new type of storage
organization "file-segment-extent" will lead us to change only
Preloader implementation (or write another one, for each type of
storage organization), other parts of current proposal will work right
out of the box.


I think we can get a huge rebalance speed improvement on very fast
SSDs even more than with batched data processing on the demander side
or fuzzy free list implementation. I'll try to prototype the current
solution and recheck all measurements.

Please correct me where I am wrong.


[1] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
[2] https://issues.apache.org/jira/browse/IGNITE-9520
[3] https://issues.apache.org/jira/browse/IGNITE-7935
On Fri, 23 Nov 2018 at 14:03, Vladimir Ozerov <vo...@gridgain.com> wrote:
>
> Maxim,
>
> Do we have any analysis on where time is really spent during current
> rebalance implementation? Proposed change may work very well. But it is
> rather complex, doesn't help in-memory mode, and will not work for other
> storage formats which we may potentially have in future (e.g. tablespace
> approach). Another thing to consider is MVCC - under load partitions may
> have a lot of entries which are not visible to anyone and is about to be
> removed, so there is no need to transfer them at all.
>
> Did we investigate any universal and less intrusive approaches to rebalance
> speedup before that? For example:
> - batched data block reads on supplier
> - iteration over partition rather than cache data tree on supplier
> - batched data save on demander
> - delayed free list and index re-build in demander
>
> Vladimir.
>
> On Fri, Nov 23, 2018 at 1:08 PM Maxim Muzafarov <ma...@gmail.com> wrote:
>
> > Igniters,
> >
> >
> > I'd like to take the next step of increasing the Apache Ignite with
> > enabled persistence rebalance speed. Currently, the rebalancing
> > procedure doesn't utilize the network and storage device throughout to
> > its full extent even with enough meaningful values of
> > rebalanceThreadPoolSize property. As part of the previous discussion
> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > idea [3] of transferring cache partition files over the network.
> > From my point, the case to which this type of rebalancing procedure
> > can bring the most benefit – is adding a completely new node or set of
> > new nodes to the cluster. Such a scenario implies fully relocation of
> > cache partition files to the new node. To roughly estimate the
> > superiority of partition file transmitting over the network the native
> > Linux scp\rsync commands can be used. My test environment showed the
> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > single-threaded rebalance speed.
> >
> >
> > I've prepared the design document IEP-28 [4] and accumulated all the
> > process details of a new rebalance approach on that page. Below you
> > can find the most significant details of the new rebalance procedure
> > and components of the Apache Ignite which are proposed to change.
> >
> > Any feedback is very appreciated.
> >
> >
> > *PROCESS OVERVIEW*
> >
> > The whole process is described in terms of rebalancing single cache
> > group and partition files would be rebalanced one-by-one:
> >
> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > supplier node;
> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > starts the new checkpoint process;
> > 3. The supplier node creates empty the temporary cache partition file
> > with .tmp postfix in the same cache persistence directory;
> > 4. The supplier node splits the whole cache partition file into
> > virtual chunks of predefined size (multiply to the PageMemory size);
> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > cache partition file chunk and tries to flush dirty page to the cache
> > partition file
> > 4.1.1. If rebalance chunk already transferred
> > 4.1.1.1. Flush the dirty page to the file;
> > 4.1.2. If rebalance chunk not transferred
> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > 4.1.2.2. Flush the dirty page to the file;
> > 4.2. The node starts sending to the demander node each cache partition
> > file chunk one by one using FileChannel#transferTo
> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > it from the temporary cache partition file;
> > 4.2.2. If the current chunk is not touched – read it from the original
> > cache partition file;
> > 5. The demander node starts to listen to new pipe incoming connections
> > from the supplier node on TcpCommunicationSpi;
> > 6. The demander node creates the temporary cache partition file with
> > .tmp postfix in the same cache persistence directory;
> > 7. The demander node receives each cache partition file chunk one by one
> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > partition file position;
> > 8. When the demander node receives the whole cache partition file
> > 8.1. The node initializes received .tmp file as its appropriate cache
> > partition file;
> > 8.2. Thread-per-partition begins to apply for data entries from the
> > beginning of WAL-temporary storage;
> > 8.3. All async operations corresponding to this partition file still
> > write to the end of temporary WAL;
> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > 8.4.1. Start the first checkpoint;
> > 8.4.2. Wait for the first checkpoint ends and own the cache partition;
> > 8.4.3. All operations now are switched to the partition file instead
> > of writing to the temporary WAL;
> > 8.4.4. Schedule the temporary WAL storage deletion;
> > 9. The supplier node deletes the temporary cache partition file;
> >
> >
> > *COMPONENTS TO CHANGE*
> >
> > CommunicationSpi
> >
> > To benefit from zero copy we must delegate the file transferring to
> > FileChannel#transferTo(long, long,
> > java.nio.channels.WritableByteChannel) because the fast path of
> > transferTo method is only executed if the destination buffer inherits
> > from an internal JDK class.
> >
> > Preloader
> >
> > A new implementation of cache entries preloader assume to be done. The
> > new implementation must send and receive cache partition files over
> > the CommunicationSpi channels by chunks of data with validation
> > received items. The new layer over the cache partition file must
> > support direct using of FileChannel#transferTo method over the
> > CommunicationSpi pipe connection. The connection bandwidth of the
> > cache partition file transfer must have the ability to be limited at
> > runtime.
> >
> > Checkpointer
> >
> > When the supplier node receives the cache partition file demand
> > request it will send the file over the CommunicationSpi. The cache
> > partition file can be concurrently updated by checkpoint thread during
> > its transmission. To guarantee the file consistency Сheckpointer must
> > use copy-on-write technique and save a copy of updated chunk into the
> > temporary file.
> >
> > (new) Catch-up temporary WAL
> >
> > While the demander node is in the partition file transmission state it
> > must save all cache entries corresponding to the moving partition into
> > a new temporary WAL storage. These entries will be applied later one
> > by one on the received cache partition file. All asynchronous
> > operations will be enrolled to the end of temporary WAL storage during
> > storage reads until it becomes fully read. The file-based FIFO
> > approach assumes to be used by this process.
> >
> >
> > *RECOVERY*
> >
> > In case of crash recovery, there is no additional actions need to be
> > applied to keep the cache partition file consistency. We are not
> > recovering partition with the moving state, thus the single partition
> > file will be lost and only it. The uniqueness of it is guaranteed by
> > the single-file-transmission process. The cache partition file will be
> > fully loaded on the next rebalance procedure.
> >
> > To provide default cluster recovery guarantee we must to:
> > 1. Start the checkpoint process when the temporary WAL storage becomes
> > empty;
> > 2. Wait for the first checkpoint ends and set owning status to the
> > cache partition;
> >
> >
> >
> >
> > [1]
> > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > [2]
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > [4]
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> >

Re: [DISCUSSION] Design document. Rebalance caches by transferring partition files

Posted by Vladimir Ozerov <vo...@gridgain.com>.
Maxim,

Do we have any analysis on where time is really spent during current
rebalance implementation? Proposed change may work very well. But it is
rather complex, doesn't help in-memory mode, and will not work for other
storage formats which we may potentially have in future (e.g. tablespace
approach). Another thing to consider is MVCC - under load partitions may
have a lot of entries which are not visible to anyone and is about to be
removed, so there is no need to transfer them at all.

Did we investigate any universal and less intrusive approaches to rebalance
speedup before that? For example:
- batched data block reads on supplier
- iteration over partition rather than cache data tree on supplier
- batched data save on demander
- delayed free list and index re-build in demander

Vladimir.

On Fri, Nov 23, 2018 at 1:08 PM Maxim Muzafarov <ma...@gmail.com> wrote:

> Igniters,
>
>
> I'd like to take the next step of increasing the Apache Ignite with
> enabled persistence rebalance speed. Currently, the rebalancing
> procedure doesn't utilize the network and storage device throughout to
> its full extent even with enough meaningful values of
> rebalanceThreadPoolSize property. As part of the previous discussion
> `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> idea [3] of transferring cache partition files over the network.
> From my point, the case to which this type of rebalancing procedure
> can bring the most benefit – is adding a completely new node or set of
> new nodes to the cluster. Such a scenario implies fully relocation of
> cache partition files to the new node. To roughly estimate the
> superiority of partition file transmitting over the network the native
> Linux scp\rsync commands can be used. My test environment showed the
> result of the new approach as 270 MB/s vs the current 40 MB/s
> single-threaded rebalance speed.
>
>
> I've prepared the design document IEP-28 [4] and accumulated all the
> process details of a new rebalance approach on that page. Below you
> can find the most significant details of the new rebalance procedure
> and components of the Apache Ignite which are proposed to change.
>
> Any feedback is very appreciated.
>
>
> *PROCESS OVERVIEW*
>
> The whole process is described in terms of rebalancing single cache
> group and partition files would be rebalanced one-by-one:
>
> 1. The demander node sends the GridDhtPartitionDemandMessage to the
> supplier node;
> 2. When the supplier node receives GridDhtPartitionDemandMessage and
> starts the new checkpoint process;
> 3. The supplier node creates empty the temporary cache partition file
> with .tmp postfix in the same cache persistence directory;
> 4. The supplier node splits the whole cache partition file into
> virtual chunks of predefined size (multiply to the PageMemory size);
> 4.1. If the concurrent checkpoint thread determines the appropriate
> cache partition file chunk and tries to flush dirty page to the cache
> partition file
> 4.1.1. If rebalance chunk already transferred
> 4.1.1.1. Flush the dirty page to the file;
> 4.1.2. If rebalance chunk not transferred
> 4.1.2.1. Write this chunk to the temporary cache partition file;
> 4.1.2.2. Flush the dirty page to the file;
> 4.2. The node starts sending to the demander node each cache partition
> file chunk one by one using FileChannel#transferTo
> 4.2.1. If the current chunk was modified by checkpoint thread – read
> it from the temporary cache partition file;
> 4.2.2. If the current chunk is not touched – read it from the original
> cache partition file;
> 5. The demander node starts to listen to new pipe incoming connections
> from the supplier node on TcpCommunicationSpi;
> 6. The demander node creates the temporary cache partition file with
> .tmp postfix in the same cache persistence directory;
> 7. The demander node receives each cache partition file chunk one by one
> 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
> 7.2. The node flushes the downloaded chunk at the appropriate cache
> partition file position;
> 8. When the demander node receives the whole cache partition file
> 8.1. The node initializes received .tmp file as its appropriate cache
> partition file;
> 8.2. Thread-per-partition begins to apply for data entries from the
> beginning of WAL-temporary storage;
> 8.3. All async operations corresponding to this partition file still
> write to the end of temporary WAL;
> 8.4. At the moment of WAL-temporary storage is ready to be empty
> 8.4.1. Start the first checkpoint;
> 8.4.2. Wait for the first checkpoint ends and own the cache partition;
> 8.4.3. All operations now are switched to the partition file instead
> of writing to the temporary WAL;
> 8.4.4. Schedule the temporary WAL storage deletion;
> 9. The supplier node deletes the temporary cache partition file;
>
>
> *COMPONENTS TO CHANGE*
>
> CommunicationSpi
>
> To benefit from zero copy we must delegate the file transferring to
> FileChannel#transferTo(long, long,
> java.nio.channels.WritableByteChannel) because the fast path of
> transferTo method is only executed if the destination buffer inherits
> from an internal JDK class.
>
> Preloader
>
> A new implementation of cache entries preloader assume to be done. The
> new implementation must send and receive cache partition files over
> the CommunicationSpi channels by chunks of data with validation
> received items. The new layer over the cache partition file must
> support direct using of FileChannel#transferTo method over the
> CommunicationSpi pipe connection. The connection bandwidth of the
> cache partition file transfer must have the ability to be limited at
> runtime.
>
> Checkpointer
>
> When the supplier node receives the cache partition file demand
> request it will send the file over the CommunicationSpi. The cache
> partition file can be concurrently updated by checkpoint thread during
> its transmission. To guarantee the file consistency Сheckpointer must
> use copy-on-write technique and save a copy of updated chunk into the
> temporary file.
>
> (new) Catch-up temporary WAL
>
> While the demander node is in the partition file transmission state it
> must save all cache entries corresponding to the moving partition into
> a new temporary WAL storage. These entries will be applied later one
> by one on the received cache partition file. All asynchronous
> operations will be enrolled to the end of temporary WAL storage during
> storage reads until it becomes fully read. The file-based FIFO
> approach assumes to be used by this process.
>
>
> *RECOVERY*
>
> In case of crash recovery, there is no additional actions need to be
> applied to keep the cache partition file consistency. We are not
> recovering partition with the moving state, thus the single partition
> file will be lost and only it. The uniqueness of it is guaranteed by
> the single-file-transmission process. The cache partition file will be
> fully loaded on the next rebalance procedure.
>
> To provide default cluster recovery guarantee we must to:
> 1. Start the checkpoint process when the temporary WAL storage becomes
> empty;
> 2. Wait for the first checkpoint ends and set owning status to the
> cache partition;
>
>
>
>
> [1]
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> [2]
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> [3] https://issues.apache.org/jira/browse/IGNITE-8020
> [4]
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
>