You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by Matteo Merli <mm...@apache.org> on 2018/10/10 19:51:08 UTC

[DISCUSSION] PIP 24: Simplify memory settings

(CCed dev@bookkeeper.apache.org since this proposal is referring to both
projects and I thought it might be good to have a single discussion thread)

Wiki page is available at:
https://github.com/apache/pulsar/wiki/PIP-24%3A-Simplify-memory-settings

Pasting here for convenience:

------------------------------------------------------------


## Motivation

Configuring the correct JVM memory settings and cache sizes for a Pulsar
cluster should be
simplified.

There are currently many knobs in Netty or JVM flags for different
components and while
with a good setup the systems is very stable, it's easy to setup
non-optimal configurations
which might result in OutOfMemory errors under load.

Ideally, there should be very minimal configuration required to bring up a
Pulsar cluster
that can work under a wide set of traffic loads. In any case, we should
prefer to automatically
fallback to slower alternatives, when possible, instead of throwing OOM
exceptions.

## Goals

 1. Default setting should allow Pulsar to use the all the memory as
configured on the JVM,
    irrespective of Direct vs Heap memory
 2. Automatically set the size of caches based on the amount of memory
available to the JVM
 3. Allow to disable pooling completely for environments where memory is
scarce
 4. Allow to configure different policies to have different fallback
options when the memory
    quotas are reached


## Changes

### Netty Allocator Wrapper

Create an allocator wrapper that can be configured with different
behaviors. This will be
using the regular `PooledByteBufAllocator` but will have a configuration
object to decide
what to do in particular moments. It will also serve as a way to group and
simplify all
the Netty allocator options which are currently spread across multiple
system properties,
for which the documentation is not easily searchable.

The wrapper will be configured and instantianted through a builder class:

```java
public interface ByteBufAllocatorBuilder {

    /**
     * Creates a new {@link ByteBufAllocatorBuilder}.
     */
    public static ByteBufAllocatorBuilder create() {
        return new ByteBufAllocatorBuilderImpl();
    }

    /**
     * Finalize the configured {@link ByteBufAllocator}
     */
    ByteBufAllocator build();

    /**
     * Specify a custom allocator where the allocation requests should be
forwarded to.
     *
     * <p>
     * Default is to used {@link PooledByteBufAllocator#DEFAULT} when
pooling is required or
     * {@link UnpooledByteBufAllocator} otherwise.
     */
    ByteBufAllocatorBuilder allocator(ByteBufAllocator allocator);

    /**
     * Define the memory pooling policy
     *
     * <p>
     * Default is {@link PoolingPolicy#PooledDirect}
     */
    ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);

    /**
     * Controls the amount of concurrency for the memory pool.
     *
     * <p>
     * Default is to have a number of allocator arenas equals to 2 * CPUS.
     * <p>
     * Decreasing this number will reduce the amount of memory overhead, at
the expense of increased allocation
     * contention.
     */
    ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);

    /**
     * Define the OutOfMemory handling policy
     *
     * <p>
     * Default is {@link OomPolicy#FallbackToHeap}
     */
    ByteBufAllocatorBuilder oomPolicy(OomPolicy policy);

    /**
     * Enable the leak detection for
     *
     * <p>
     * Default is {@link LeakDetectionPolicy#Disabled}
     */
    ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy
leakDetectionPolicy);
}
```

The policies are here defined:

```java
/**
 * Define a policy for allocating buffers
 */
public enum PoolingPolicy {

    /**
     * Allocate memory from JVM heap without any pooling.
     *
     * This option has the least overhead in terms of memory usage since
the memory will be automatically reclaimed by
     * the JVM GC but might impose a performance penalty at high throughput.
     */
    UnpooledHeap,

    /**
     * Use Direct memory for all buffers and pool the memory.
     *
     * Direct memory will avoid the overhead of JVM GC and most memory
copies when reading and writing to socket
     * channel.
     *
     * Pooling will add memory space overhead due to the fact that there
will be fragmentation in the allocator and that
     * threads will keep a portion of memory as thread-local to avoid
contention when possible.
     */
    PooledDirect
}

/**
 * Represents the action to take when it's not possible to allocate memory.
 */
public enum OomPolicy {

    /**
     * Throw regular OOM exception without taking addition actions
     */
    ThrowException,

    /**
     * If it's not possible to allocate a buffer from direct memory,
fallback to allocate an unpooled buffer from JVM
     * heap.
     *
     * This will help absorb memory allocation spikes because the heap
allocations will naturally slow down the process
     * and will result if full GC cleanup if the Heap itself is full.
     */
    FallbackToHeap,

    /**
     * If it's not possible to allocate memory, kill the JVM process so
that it can be restarted immediately.
     *
     */
    KillProcess,
}

/**
 * Define the policy for the Netty leak detector
 */
public enum LeakDetectionPolicy {

    /**
     * No leak detection and no overhead
     */
    Disabled,

    /**
     * Instruments 1% of the allocated buffer to track for leaks
     */
    Simple,

    /**
     * Instruments 1% of the allocated buffer to track for leaks, reporting
stack traces of places where the buffer was
     * used
     */
    Advanced,

    /**
     * Instruments 100% of the allocated buffer to track for leaks,
reporting stack traces of places where the buffer
     * was used. Introduce very significant overhead.
     */
    Paranoid,
}
```

It will be possible to create an allocator through the builder and then
pass it through
to Netty client/server or just directly allocate buffers.

```java
ByteBufAllocator allocator = ByteBufAllocatorBuilder.create()
        .poolingPolicy(PoolingPolicy.PooledDirect)
        .oomPolicy(OomPolicy.FallbackToHeap)
        .leakDetectionPolicy(LeakDetectionPolicy.Disabled)
        .build();
```

### Component changes

In addition to used the policies based allocator wrapper, each component
will have
additional changes.

#### Pulsar broker

Add configuration options in `broker.conf` to allow configuration of the
allocator. Eg.:

```properties
allocatorPoolingPolicy=PooledDirect
allocatorPoolingConcurrency=4
allocatorOomPolicy=FallbackToHeap
allocatorLeakDetectionPolicy=Disabled
```

##### Managed ledger cache

Currently, in Pulsar broker, the only memory pooled from the direct memory
region, in
addition to regular IO buffer is the ManagedLedgerCache. This cache is used
to dispatch
directly to consumers (once a message is persisted), avoiding reads from
bookies for
consumers that are caught up with producers.

By default, the managed ledger cache size will be set to 1/3rd of the total
available
direct memory (or heap if pooling is disabled).

The setting will be left empty to indicate the default dynamic behavior:

```
managedLedgerCacheSizeMb=
```

#### BookKeeper Client

Add options to configure the allocator in `ClientConfiguration` object.

#### Bookie

Add options to configure the allocator in `ServerConfiguration` object and
`bookkeeper.conf`.

By default, in Pulsar we configure BookKeeper to use DbLedgerStorage. This
storage
implementation has 2 main sources of memory allocations, the read and write
caches.

By default, the configured direct memory region will be divided into 3
portions:
 * IO buffers - (50% and max to 4GB)
 * Write cache - 25 %
 * Read cache - 25 %

If there is a lot of direct memory available, max 4GB will be assigned to
IO buffers and
the rest will be split between read and write caches.

This will still not take into account the memory used by RocksDB block
cache, since this
will be allocated from within the JNI library and not accounted for in JVM
heap or
direct memory regions.

The rule of thumb here would be to default to a size pegged to the direct
memory size,
say 1/5th of it.

#### Pulsar Client

Add options to configure allocator policies in `PulsarClientBuilder`.

Additionally, for `PulsarClient` we should be able to define a max amount
of memory
that a single instance is allowed to use.

This memory will be used when accumulating messages in the producers
pending messages
queue or consumer receiving queues.

When the assigned client memory is filled up, some actions will be taken:

 * For producer it would be the same as the producer queue full condition,
with either
   immediate send error or blocking behavior, depending on existing
configuration.
 * For consumers, the flow control mechanism will be slowed down, by not
asking the
   brokers for more messages, once the memory is full.

A reasonable default might be to use 64 MB per client instance, which will
be shared
across all producers consumers created by that instance.

-- 
Matteo Merli
<mm...@apache.org>

Re: [DISCUSSION] PIP 24: Simplify memory settings

Posted by Sijie Guo <gu...@gmail.com>.
On Wed, Oct 10, 2018 at 4:13 PM Matteo Merli <ma...@gmail.com> wrote:

> On Wed, Oct 10, 2018 at 4:03 PM Sijie Guo <gu...@gmail.com> wrote:
>
> > the proposal looks good to me.
> >
> > just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and
> > `PooledDirect`?
> >
>
> I think these are the 2 that make sense (from my perspective):
>
>  1. Pooling heap memory still suffers from interactions with GC and will
> still involve copying the buffers multiple times when reading/writing on
> network.
>      Sure, it would reduce the allocation rate, but if one wants to pool
> memory it's preferable to use direct memory.
>
>  2. Using unpooled direct memory for each buffer would be very slow so not
> to be useful in practice
>


Make sense to me +1 then

>
>
> --
> Matteo Merli
> <mm...@apache.org>
>

Re: [DISCUSSION] PIP 24: Simplify memory settings

Posted by Sijie Guo <gu...@gmail.com>.
On Wed, Oct 10, 2018 at 4:13 PM Matteo Merli <ma...@gmail.com> wrote:

> On Wed, Oct 10, 2018 at 4:03 PM Sijie Guo <gu...@gmail.com> wrote:
>
> > the proposal looks good to me.
> >
> > just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and
> > `PooledDirect`?
> >
>
> I think these are the 2 that make sense (from my perspective):
>
>  1. Pooling heap memory still suffers from interactions with GC and will
> still involve copying the buffers multiple times when reading/writing on
> network.
>      Sure, it would reduce the allocation rate, but if one wants to pool
> memory it's preferable to use direct memory.
>
>  2. Using unpooled direct memory for each buffer would be very slow so not
> to be useful in practice
>


Make sense to me +1 then

>
>
> --
> Matteo Merli
> <mm...@apache.org>
>

Re: [DISCUSSION] PIP 24: Simplify memory settings

Posted by Matteo Merli <ma...@gmail.com>.
On Wed, Oct 10, 2018 at 4:03 PM Sijie Guo <gu...@gmail.com> wrote:

> the proposal looks good to me.
>
> just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and
> `PooledDirect`?
>

I think these are the 2 that make sense (from my perspective):

 1. Pooling heap memory still suffers from interactions with GC and will
still involve copying the buffers multiple times when reading/writing on
network.
     Sure, it would reduce the allocation rate, but if one wants to pool
memory it's preferable to use direct memory.

 2. Using unpooled direct memory for each buffer would be very slow so not
to be useful in practice


-- 
Matteo Merli
<mm...@apache.org>

Re: [DISCUSSION] PIP 24: Simplify memory settings

Posted by Matteo Merli <ma...@gmail.com>.
On Wed, Oct 10, 2018 at 4:03 PM Sijie Guo <gu...@gmail.com> wrote:

> the proposal looks good to me.
>
> just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and
> `PooledDirect`?
>

I think these are the 2 that make sense (from my perspective):

 1. Pooling heap memory still suffers from interactions with GC and will
still involve copying the buffers multiple times when reading/writing on
network.
     Sure, it would reduce the allocation rate, but if one wants to pool
memory it's preferable to use direct memory.

 2. Using unpooled direct memory for each buffer would be very slow so not
to be useful in practice


-- 
Matteo Merli
<mm...@apache.org>

Re: [DISCUSSION] PIP 24: Simplify memory settings

Posted by Sijie Guo <gu...@gmail.com>.
the proposal looks good to me.

just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and
`PooledDirect`?

- Sijie

On Wed, Oct 10, 2018 at 12:51 PM Matteo Merli <mm...@apache.org> wrote:

> (CCed dev@bookkeeper.apache.org since this proposal is referring to both
> projects and I thought it might be good to have a single discussion thread)
>
> Wiki page is available at:
> https://github.com/apache/pulsar/wiki/PIP-24%3A-Simplify-memory-settings
>
> Pasting here for convenience:
>
> ------------------------------------------------------------
>
>
> ## Motivation
>
> Configuring the correct JVM memory settings and cache sizes for a Pulsar
> cluster should be
> simplified.
>
> There are currently many knobs in Netty or JVM flags for different
> components and while
> with a good setup the systems is very stable, it's easy to setup
> non-optimal configurations
> which might result in OutOfMemory errors under load.
>
> Ideally, there should be very minimal configuration required to bring up a
> Pulsar cluster
> that can work under a wide set of traffic loads. In any case, we should
> prefer to automatically
> fallback to slower alternatives, when possible, instead of throwing OOM
> exceptions.
>
> ## Goals
>
>  1. Default setting should allow Pulsar to use the all the memory as
> configured on the JVM,
>     irrespective of Direct vs Heap memory
>  2. Automatically set the size of caches based on the amount of memory
> available to the JVM
>  3. Allow to disable pooling completely for environments where memory is
> scarce
>  4. Allow to configure different policies to have different fallback
> options when the memory
>     quotas are reached
>
>
> ## Changes
>
> ### Netty Allocator Wrapper
>
> Create an allocator wrapper that can be configured with different
> behaviors. This will be
> using the regular `PooledByteBufAllocator` but will have a configuration
> object to decide
> what to do in particular moments. It will also serve as a way to group and
> simplify all
> the Netty allocator options which are currently spread across multiple
> system properties,
> for which the documentation is not easily searchable.
>
> The wrapper will be configured and instantianted through a builder class:
>
> ```java
> public interface ByteBufAllocatorBuilder {
>
>     /**
>      * Creates a new {@link ByteBufAllocatorBuilder}.
>      */
>     public static ByteBufAllocatorBuilder create() {
>         return new ByteBufAllocatorBuilderImpl();
>     }
>
>     /**
>      * Finalize the configured {@link ByteBufAllocator}
>      */
>     ByteBufAllocator build();
>
>     /**
>      * Specify a custom allocator where the allocation requests should be
> forwarded to.
>      *
>      * <p>
>      * Default is to used {@link PooledByteBufAllocator#DEFAULT} when
> pooling is required or
>      * {@link UnpooledByteBufAllocator} otherwise.
>      */
>     ByteBufAllocatorBuilder allocator(ByteBufAllocator allocator);
>
>     /**
>      * Define the memory pooling policy
>      *
>      * <p>
>      * Default is {@link PoolingPolicy#PooledDirect}
>      */
>     ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);
>
>     /**
>      * Controls the amount of concurrency for the memory pool.
>      *
>      * <p>
>      * Default is to have a number of allocator arenas equals to 2 * CPUS.
>      * <p>
>      * Decreasing this number will reduce the amount of memory overhead, at
> the expense of increased allocation
>      * contention.
>      */
>     ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);
>
>     /**
>      * Define the OutOfMemory handling policy
>      *
>      * <p>
>      * Default is {@link OomPolicy#FallbackToHeap}
>      */
>     ByteBufAllocatorBuilder oomPolicy(OomPolicy policy);
>
>     /**
>      * Enable the leak detection for
>      *
>      * <p>
>      * Default is {@link LeakDetectionPolicy#Disabled}
>      */
>     ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy
> leakDetectionPolicy);
> }
> ```
>
> The policies are here defined:
>
> ```java
> /**
>  * Define a policy for allocating buffers
>  */
> public enum PoolingPolicy {
>
>     /**
>      * Allocate memory from JVM heap without any pooling.
>      *
>      * This option has the least overhead in terms of memory usage since
> the memory will be automatically reclaimed by
>      * the JVM GC but might impose a performance penalty at high
> throughput.
>      */
>     UnpooledHeap,
>
>     /**
>      * Use Direct memory for all buffers and pool the memory.
>      *
>      * Direct memory will avoid the overhead of JVM GC and most memory
> copies when reading and writing to socket
>      * channel.
>      *
>      * Pooling will add memory space overhead due to the fact that there
> will be fragmentation in the allocator and that
>      * threads will keep a portion of memory as thread-local to avoid
> contention when possible.
>      */
>     PooledDirect
> }
>
> /**
>  * Represents the action to take when it's not possible to allocate memory.
>  */
> public enum OomPolicy {
>
>     /**
>      * Throw regular OOM exception without taking addition actions
>      */
>     ThrowException,
>
>     /**
>      * If it's not possible to allocate a buffer from direct memory,
> fallback to allocate an unpooled buffer from JVM
>      * heap.
>      *
>      * This will help absorb memory allocation spikes because the heap
> allocations will naturally slow down the process
>      * and will result if full GC cleanup if the Heap itself is full.
>      */
>     FallbackToHeap,
>
>     /**
>      * If it's not possible to allocate memory, kill the JVM process so
> that it can be restarted immediately.
>      *
>      */
>     KillProcess,
> }
>
> /**
>  * Define the policy for the Netty leak detector
>  */
> public enum LeakDetectionPolicy {
>
>     /**
>      * No leak detection and no overhead
>      */
>     Disabled,
>
>     /**
>      * Instruments 1% of the allocated buffer to track for leaks
>      */
>     Simple,
>
>     /**
>      * Instruments 1% of the allocated buffer to track for leaks, reporting
> stack traces of places where the buffer was
>      * used
>      */
>     Advanced,
>
>     /**
>      * Instruments 100% of the allocated buffer to track for leaks,
> reporting stack traces of places where the buffer
>      * was used. Introduce very significant overhead.
>      */
>     Paranoid,
> }
> ```
>
> It will be possible to create an allocator through the builder and then
> pass it through
> to Netty client/server or just directly allocate buffers.
>
> ```java
> ByteBufAllocator allocator = ByteBufAllocatorBuilder.create()
>         .poolingPolicy(PoolingPolicy.PooledDirect)
>         .oomPolicy(OomPolicy.FallbackToHeap)
>         .leakDetectionPolicy(LeakDetectionPolicy.Disabled)
>         .build();
> ```
>
> ### Component changes
>
> In addition to used the policies based allocator wrapper, each component
> will have
> additional changes.
>
> #### Pulsar broker
>
> Add configuration options in `broker.conf` to allow configuration of the
> allocator. Eg.:
>
> ```properties
> allocatorPoolingPolicy=PooledDirect
> allocatorPoolingConcurrency=4
> allocatorOomPolicy=FallbackToHeap
> allocatorLeakDetectionPolicy=Disabled
> ```
>
> ##### Managed ledger cache
>
> Currently, in Pulsar broker, the only memory pooled from the direct memory
> region, in
> addition to regular IO buffer is the ManagedLedgerCache. This cache is used
> to dispatch
> directly to consumers (once a message is persisted), avoiding reads from
> bookies for
> consumers that are caught up with producers.
>
> By default, the managed ledger cache size will be set to 1/3rd of the total
> available
> direct memory (or heap if pooling is disabled).
>
> The setting will be left empty to indicate the default dynamic behavior:
>
> ```
> managedLedgerCacheSizeMb=
> ```
>
> #### BookKeeper Client
>
> Add options to configure the allocator in `ClientConfiguration` object.
>
> #### Bookie
>
> Add options to configure the allocator in `ServerConfiguration` object and
> `bookkeeper.conf`.
>
> By default, in Pulsar we configure BookKeeper to use DbLedgerStorage. This
> storage
> implementation has 2 main sources of memory allocations, the read and write
> caches.
>
> By default, the configured direct memory region will be divided into 3
> portions:
>  * IO buffers - (50% and max to 4GB)
>  * Write cache - 25 %
>  * Read cache - 25 %
>
> If there is a lot of direct memory available, max 4GB will be assigned to
> IO buffers and
> the rest will be split between read and write caches.
>
> This will still not take into account the memory used by RocksDB block
> cache, since this
> will be allocated from within the JNI library and not accounted for in JVM
> heap or
> direct memory regions.
>
> The rule of thumb here would be to default to a size pegged to the direct
> memory size,
> say 1/5th of it.
>
> #### Pulsar Client
>
> Add options to configure allocator policies in `PulsarClientBuilder`.
>
> Additionally, for `PulsarClient` we should be able to define a max amount
> of memory
> that a single instance is allowed to use.
>
> This memory will be used when accumulating messages in the producers
> pending messages
> queue or consumer receiving queues.
>
> When the assigned client memory is filled up, some actions will be taken:
>
>  * For producer it would be the same as the producer queue full condition,
> with either
>    immediate send error or blocking behavior, depending on existing
> configuration.
>  * For consumers, the flow control mechanism will be slowed down, by not
> asking the
>    brokers for more messages, once the memory is full.
>
> A reasonable default might be to use 64 MB per client instance, which will
> be shared
> across all producers consumers created by that instance.
>
> --
> Matteo Merli
> <mm...@apache.org>
>

Re: [DISCUSSION] PIP 24: Simplify memory settings

Posted by Sijie Guo <gu...@gmail.com>.
the proposal looks good to me.

just out of curiosity, why PoolingPolicy only has `UnpooledHeap` and
`PooledDirect`?

- Sijie

On Wed, Oct 10, 2018 at 12:51 PM Matteo Merli <mm...@apache.org> wrote:

> (CCed dev@bookkeeper.apache.org since this proposal is referring to both
> projects and I thought it might be good to have a single discussion thread)
>
> Wiki page is available at:
> https://github.com/apache/pulsar/wiki/PIP-24%3A-Simplify-memory-settings
>
> Pasting here for convenience:
>
> ------------------------------------------------------------
>
>
> ## Motivation
>
> Configuring the correct JVM memory settings and cache sizes for a Pulsar
> cluster should be
> simplified.
>
> There are currently many knobs in Netty or JVM flags for different
> components and while
> with a good setup the systems is very stable, it's easy to setup
> non-optimal configurations
> which might result in OutOfMemory errors under load.
>
> Ideally, there should be very minimal configuration required to bring up a
> Pulsar cluster
> that can work under a wide set of traffic loads. In any case, we should
> prefer to automatically
> fallback to slower alternatives, when possible, instead of throwing OOM
> exceptions.
>
> ## Goals
>
>  1. Default setting should allow Pulsar to use the all the memory as
> configured on the JVM,
>     irrespective of Direct vs Heap memory
>  2. Automatically set the size of caches based on the amount of memory
> available to the JVM
>  3. Allow to disable pooling completely for environments where memory is
> scarce
>  4. Allow to configure different policies to have different fallback
> options when the memory
>     quotas are reached
>
>
> ## Changes
>
> ### Netty Allocator Wrapper
>
> Create an allocator wrapper that can be configured with different
> behaviors. This will be
> using the regular `PooledByteBufAllocator` but will have a configuration
> object to decide
> what to do in particular moments. It will also serve as a way to group and
> simplify all
> the Netty allocator options which are currently spread across multiple
> system properties,
> for which the documentation is not easily searchable.
>
> The wrapper will be configured and instantianted through a builder class:
>
> ```java
> public interface ByteBufAllocatorBuilder {
>
>     /**
>      * Creates a new {@link ByteBufAllocatorBuilder}.
>      */
>     public static ByteBufAllocatorBuilder create() {
>         return new ByteBufAllocatorBuilderImpl();
>     }
>
>     /**
>      * Finalize the configured {@link ByteBufAllocator}
>      */
>     ByteBufAllocator build();
>
>     /**
>      * Specify a custom allocator where the allocation requests should be
> forwarded to.
>      *
>      * <p>
>      * Default is to used {@link PooledByteBufAllocator#DEFAULT} when
> pooling is required or
>      * {@link UnpooledByteBufAllocator} otherwise.
>      */
>     ByteBufAllocatorBuilder allocator(ByteBufAllocator allocator);
>
>     /**
>      * Define the memory pooling policy
>      *
>      * <p>
>      * Default is {@link PoolingPolicy#PooledDirect}
>      */
>     ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);
>
>     /**
>      * Controls the amount of concurrency for the memory pool.
>      *
>      * <p>
>      * Default is to have a number of allocator arenas equals to 2 * CPUS.
>      * <p>
>      * Decreasing this number will reduce the amount of memory overhead, at
> the expense of increased allocation
>      * contention.
>      */
>     ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);
>
>     /**
>      * Define the OutOfMemory handling policy
>      *
>      * <p>
>      * Default is {@link OomPolicy#FallbackToHeap}
>      */
>     ByteBufAllocatorBuilder oomPolicy(OomPolicy policy);
>
>     /**
>      * Enable the leak detection for
>      *
>      * <p>
>      * Default is {@link LeakDetectionPolicy#Disabled}
>      */
>     ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy
> leakDetectionPolicy);
> }
> ```
>
> The policies are here defined:
>
> ```java
> /**
>  * Define a policy for allocating buffers
>  */
> public enum PoolingPolicy {
>
>     /**
>      * Allocate memory from JVM heap without any pooling.
>      *
>      * This option has the least overhead in terms of memory usage since
> the memory will be automatically reclaimed by
>      * the JVM GC but might impose a performance penalty at high
> throughput.
>      */
>     UnpooledHeap,
>
>     /**
>      * Use Direct memory for all buffers and pool the memory.
>      *
>      * Direct memory will avoid the overhead of JVM GC and most memory
> copies when reading and writing to socket
>      * channel.
>      *
>      * Pooling will add memory space overhead due to the fact that there
> will be fragmentation in the allocator and that
>      * threads will keep a portion of memory as thread-local to avoid
> contention when possible.
>      */
>     PooledDirect
> }
>
> /**
>  * Represents the action to take when it's not possible to allocate memory.
>  */
> public enum OomPolicy {
>
>     /**
>      * Throw regular OOM exception without taking addition actions
>      */
>     ThrowException,
>
>     /**
>      * If it's not possible to allocate a buffer from direct memory,
> fallback to allocate an unpooled buffer from JVM
>      * heap.
>      *
>      * This will help absorb memory allocation spikes because the heap
> allocations will naturally slow down the process
>      * and will result if full GC cleanup if the Heap itself is full.
>      */
>     FallbackToHeap,
>
>     /**
>      * If it's not possible to allocate memory, kill the JVM process so
> that it can be restarted immediately.
>      *
>      */
>     KillProcess,
> }
>
> /**
>  * Define the policy for the Netty leak detector
>  */
> public enum LeakDetectionPolicy {
>
>     /**
>      * No leak detection and no overhead
>      */
>     Disabled,
>
>     /**
>      * Instruments 1% of the allocated buffer to track for leaks
>      */
>     Simple,
>
>     /**
>      * Instruments 1% of the allocated buffer to track for leaks, reporting
> stack traces of places where the buffer was
>      * used
>      */
>     Advanced,
>
>     /**
>      * Instruments 100% of the allocated buffer to track for leaks,
> reporting stack traces of places where the buffer
>      * was used. Introduce very significant overhead.
>      */
>     Paranoid,
> }
> ```
>
> It will be possible to create an allocator through the builder and then
> pass it through
> to Netty client/server or just directly allocate buffers.
>
> ```java
> ByteBufAllocator allocator = ByteBufAllocatorBuilder.create()
>         .poolingPolicy(PoolingPolicy.PooledDirect)
>         .oomPolicy(OomPolicy.FallbackToHeap)
>         .leakDetectionPolicy(LeakDetectionPolicy.Disabled)
>         .build();
> ```
>
> ### Component changes
>
> In addition to used the policies based allocator wrapper, each component
> will have
> additional changes.
>
> #### Pulsar broker
>
> Add configuration options in `broker.conf` to allow configuration of the
> allocator. Eg.:
>
> ```properties
> allocatorPoolingPolicy=PooledDirect
> allocatorPoolingConcurrency=4
> allocatorOomPolicy=FallbackToHeap
> allocatorLeakDetectionPolicy=Disabled
> ```
>
> ##### Managed ledger cache
>
> Currently, in Pulsar broker, the only memory pooled from the direct memory
> region, in
> addition to regular IO buffer is the ManagedLedgerCache. This cache is used
> to dispatch
> directly to consumers (once a message is persisted), avoiding reads from
> bookies for
> consumers that are caught up with producers.
>
> By default, the managed ledger cache size will be set to 1/3rd of the total
> available
> direct memory (or heap if pooling is disabled).
>
> The setting will be left empty to indicate the default dynamic behavior:
>
> ```
> managedLedgerCacheSizeMb=
> ```
>
> #### BookKeeper Client
>
> Add options to configure the allocator in `ClientConfiguration` object.
>
> #### Bookie
>
> Add options to configure the allocator in `ServerConfiguration` object and
> `bookkeeper.conf`.
>
> By default, in Pulsar we configure BookKeeper to use DbLedgerStorage. This
> storage
> implementation has 2 main sources of memory allocations, the read and write
> caches.
>
> By default, the configured direct memory region will be divided into 3
> portions:
>  * IO buffers - (50% and max to 4GB)
>  * Write cache - 25 %
>  * Read cache - 25 %
>
> If there is a lot of direct memory available, max 4GB will be assigned to
> IO buffers and
> the rest will be split between read and write caches.
>
> This will still not take into account the memory used by RocksDB block
> cache, since this
> will be allocated from within the JNI library and not accounted for in JVM
> heap or
> direct memory regions.
>
> The rule of thumb here would be to default to a size pegged to the direct
> memory size,
> say 1/5th of it.
>
> #### Pulsar Client
>
> Add options to configure allocator policies in `PulsarClientBuilder`.
>
> Additionally, for `PulsarClient` we should be able to define a max amount
> of memory
> that a single instance is allowed to use.
>
> This memory will be used when accumulating messages in the producers
> pending messages
> queue or consumer receiving queues.
>
> When the assigned client memory is filled up, some actions will be taken:
>
>  * For producer it would be the same as the producer queue full condition,
> with either
>    immediate send error or blocking behavior, depending on existing
> configuration.
>  * For consumers, the flow control mechanism will be slowed down, by not
> asking the
>    brokers for more messages, once the memory is full.
>
> A reasonable default might be to use 64 MB per client instance, which will
> be shared
> across all producers consumers created by that instance.
>
> --
> Matteo Merli
> <mm...@apache.org>
>