You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/10 19:42:45 UTC

[pulsar.wiki] branch master updated: Created PIP-24: Simplify memory settings (markdown)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new edc57b3  Created PIP-24: Simplify memory settings (markdown)
edc57b3 is described below

commit edc57b31c018ec28efe93474a6b9012553e01ff9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 10 12:42:44 2018 -0700

    Created PIP-24: Simplify memory settings (markdown)
---
 PIP-24:-Simplify-memory-settings.md | 273 ++++++++++++++++++++++++++++++++++++
 1 file changed, 273 insertions(+)

diff --git a/PIP-24:-Simplify-memory-settings.md b/PIP-24:-Simplify-memory-settings.md
new file mode 100644
index 0000000..41f8b8b
--- /dev/null
+++ b/PIP-24:-Simplify-memory-settings.md
@@ -0,0 +1,273 @@
+* **Status**: Proposal
+* **Author**: [Matteo Merli](https://github.com/merlimat)
+* **Pull Request**:
+* **Mailing List discussion**:
+
+
+## Motivation
+
+Configuring the correct JVM memory settings and cache sizes for a Pulsar cluster should be
+simplified.
+
+There are currently many knobs in Netty or JVM flags for different components and while
+with a good setup the systems is very stable, it's easy to setup non-optimal configurations
+which might result in OutOfMemory errors under load.
+
+Ideally, there should be very minimal configuration required to bring up a Pulsar cluster
+that can work under a wide set of traffic loads. In any case, we should prefer to automatically
+fallback to slower alternatives, when possible, instead of throwing OOM exceptions.
+
+## Goals
+
+ 1. Default setting should allow Pulsar to use the all the memory as configured on the JVM,
+    irrespective of Direct vs Heap memory
+ 1. Automatically set the size of caches based on the amount of memory available to the JVM
+ 1. Allow to disable pooling completely for environments where memory is scarce
+ 1. Allow to configure different policies to have different fallback options when the memory
+    quotas are reached
+
+
+## Changes
+
+### Netty Allocator Wrapper
+
+Create an allocator wrapper that can be configured with different behaviors. This will be
+using the regular `PooledByteBufAllocator` but will have a configuration object to decide
+what to do in particular moments. It will also serve as a way to group and simplify all
+the Netty allocator options which are currently spread across multiple system properties,
+for which the documentation is not easily searchable.
+
+The wrapper will be configured and instantianted through a builder class:
+
+```java
+public interface ByteBufAllocatorBuilder {
+
+    /**
+     * Creates a new {@link ByteBufAllocatorBuilder}.
+     */
+    public static ByteBufAllocatorBuilder create() {
+        return new ByteBufAllocatorBuilderImpl();
+    }
+
+    /**
+     * Finalize the configured {@link ByteBufAllocator}
+     */
+    ByteBufAllocator build();
+
+    /**
+     * Specify a custom allocator where the allocation requests should be forwarded to.
+     *
+     * <p>
+     * Default is to used {@link PooledByteBufAllocator#DEFAULT} when pooling is required or
+     * {@link UnpooledByteBufAllocator} otherwise.
+     */
+    ByteBufAllocatorBuilder allocator(ByteBufAllocator allocator);
+
+    /**
+     * Define the memory pooling policy
+     *
+     * <p>
+     * Default is {@link PoolingPolicy#PooledDirect}
+     */
+    ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);
+
+    /**
+     * Controls the amount of concurrency for the memory pool.
+     *
+     * <p>
+     * Default is to have a number of allocator arenas equals to 2 * CPUS.
+     * <p>
+     * Decreasing this number will reduce the amount of memory overhead, at the expense of increased allocation
+     * contention.
+     */
+    ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);
+
+    /**
+     * Define the OutOfMemory handling policy
+     *
+     * <p>
+     * Default is {@link OomPolicy#FallbackToHeap}
+     */
+    ByteBufAllocatorBuilder oomPolicy(OomPolicy policy);
+
+    /**
+     * Enable the leak detection for
+     *
+     * <p>
+     * Default is {@link LeakDetectionPolicy#Disabled}
+     */
+    ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy);
+}
+```
+
+The policies are here defined:
+
+```java
+/**
+ * Define a policy for allocating buffers
+ */
+public enum PoolingPolicy {
+
+    /**
+     * Allocate memory from JVM heap without any pooling.
+     *
+     * This option has the least overhead in terms of memory usage since the memory will be automatically reclaimed by
+     * the JVM GC but might impose a performance penalty at high throughput.
+     */
+    UnpooledHeap,
+
+    /**
+     * Use Direct memory for all buffers and pool the memory.
+     *
+     * Direct memory will avoid the overhead of JVM GC and most memory copies when reading and writing to socket
+     * channel.
+     *
+     * Pooling will add memory space overhead due to the fact that there will be fragmentation in the allocator and that
+     * threads will keep a portion of memory as thread-local to avoid contention when possible.
+     */
+    PooledDirect
+}
+
+/**
+ * Represents the action to take when it's not possible to allocate memory.
+ */
+public enum OomPolicy {
+
+    /**
+     * Throw regular OOM exception without taking addition actions
+     */
+    ThrowException,
+
+    /**
+     * If it's not possible to allocate a buffer from direct memory, fallback to allocate an unpooled buffer from JVM
+     * heap.
+     *
+     * This will help absorb memory allocation spikes because the heap allocations will naturally slow down the process
+     * and will result if full GC cleanup if the Heap itself is full.
+     */
+    FallbackToHeap,
+
+    /**
+     * If it's not possible to allocate memory, kill the JVM process so that it can be restarted immediately.
+     *
+     */
+    KillProcess,
+}
+
+/**
+ * Define the policy for the Netty leak detector
+ */
+public enum LeakDetectionPolicy {
+
+    /**
+     * No leak detection and no overhead
+     */
+    Disabled,
+
+    /**
+     * Instruments 1% of the allocated buffer to track for leaks
+     */
+    Simple,
+
+    /**
+     * Instruments 1% of the allocated buffer to track for leaks, reporting stack traces of places where the buffer was
+     * used
+     */
+    Advanced,
+
+    /**
+     * Instruments 100% of the allocated buffer to track for leaks, reporting stack traces of places where the buffer
+     * was used. Introduce very significant overhead.
+     */
+    Paranoid,
+}
+```
+
+It will be possible to create an allocator through the builder and then pass it through
+to Netty client/server or just directly allocate buffers.
+
+```java
+ByteBufAllocator allocator = ByteBufAllocatorBuilder.create()
+        .poolingPolicy(PoolingPolicy.PooledDirect)
+        .oomPolicy(OomPolicy.FallbackToHeap)
+        .leakDetectionPolicy(LeakDetectionPolicy.Disabled)
+        .build();
+```
+
+### Component changes
+
+In addition to used the policies based allocator wrapper, each component will have
+additional changes.
+
+#### Pulsar broker
+
+Add configuration options in `broker.conf` to allow configuration of the allocator. Eg.:
+
+```properties
+allocatorPoolingPolicy=PooledDirect
+allocatorPoolingConcurrency=4
+allocatorOomPolicy=FallbackToHeap
+allocatorLeakDetectionPolicy=Disabled
+```
+
+##### Managed ledger cache
+
+Currently, in Pulsar broker, the only memory pooled from the direct memory region, in
+addition to regular IO buffer is the ManagedLedgerCache. This cache is used to dispatch
+directly to consumers (once a message is persisted), avoiding reads from bookies for
+consumers that are caught up with producers.
+
+By default, the managed ledger cache size will be set to 1/3rd of the total available
+direct memory (or heap if pooling is disabled).
+
+The setting will be left empty to indicate the default dynamic behavior:
+
+```
+managedLedgerCacheSizeMb=
+```
+
+#### BookKeeper Client
+
+Add options to configure the allocator in `ClientConfiguration` object.
+
+#### Bookie
+
+Add options to configure the allocator in `ServerConfiguration` object and `bookkeeper.conf`.
+
+By default, in Pulsar we configure BookKeeper to use DbLedgerStorage. This storage
+implementation has 2 main sources of memory allocations, the read and write caches.
+
+By default, the configured direct memory region will be divided into 3 portions:
+ * IO buffers - (50% and max to 4GB)
+ * Write cache - 25 %
+ * Read cache - 25 %
+
+If there is a lot of direct memory available, max 4GB will be assigned to IO buffers and
+the rest will be split between read and write caches.
+
+This will still not take into account the memory used by RocksDB block cache, since this
+will be allocated from within the JNI library and not accounted for in JVM heap or
+direct memory regions.
+
+The rule of thumb here would be to default to a size pegged to the direct memory size,
+say 1/5th of it.
+
+#### Pulsar Client
+
+Add options to configure allocator policies in `PulsarClientBuilder`.
+
+Additionally, for `PulsarClient` we should be able to define a max amount of memory
+that a single instance is allowed to use.
+
+This memory will be used when accumulating messages in the producers pending messages
+queue or consumer receiving queues.
+
+When the assigned client memory is filled up, some actions will be taken:
+
+ * For producer it would be the same as the producer queue full condition, with either
+   immediate send error or blocking behavior, depending on existing configuration.
+ * For consumers, the flow control mechanism will be slowed down, by not asking the
+   brokers for more messages, once the memory is full.
+
+A reasonable default might be to use 64 MB per client instance, which will be shared
+across all producers consumers created by that instance.