You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Matteo Merli <mm...@apache.org> on 2020/12/15 20:20:17 UTC

[PROPOSAL] PIP 74: Pulsar client memory limits

https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits


## Motivation

Currently, there are multiple settings in producers and consumers that allow
to control the sizes of the message queues. These are ultimately used to
control the amount of memory used by the Pulsar client.

There are few issues with the current approach, that makes it complicated, in
non-trivial scenarios, to select an overall configuration that leads to a
certain use of memory:

 1. The settings are in terms of "number of messages", so one has to adjust
    based on the expected message size.

 2. The settings are per producer/consumer. If an application has a large
    (or simply unknown) number number of producers/consumer, it's very difficult
    to select an appropriate value for queue sizes. The same is true for topics
    that have many partitions.

## Goal

Allow to specify a maximum amount of memory on a given Pulsar client. The
producers and consumers will compete for the memory assigned.

The scope of this proposal is to track the "direct" memory used to hold
outgoing or incoming message payloads. The usage of JVM heap memory is outside
the scope, though normally it only represents a small fraction of the payloads
size.

The change will be done in multiple steps:
 1. Introduce new API to set the memory limit, keeping it disabled by default
  1.1 Implement memory limit for producers
 2. Implement memory limit for consumers
 3. (Once the feature is stable)
    Enable memory limit by default, with 64 MB
 4. Deprecate producer queue size related settings
 5. Ignore producer queue size related settings

## Client API

```java

public class ClientBuilder {
  // ....

  /**
   * Configure a limit on the amount of direct memory that will be
allocated by this client instance.
   * <p>
   * Setting this to 0 will disable the limit.
   *
   * @param memoryLimit
   *            the limit
   * @param unit
   *            the memory limit size unit
   * @return the client builder instance
   */
  ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
}
```

For example:

```java
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .memoryLimit(64, SizeUnit.MEGA_BYTES)
    .create();
```

The same logic described here will be implemented first in Java and then
in the rest of supported client libraries.

## Implementation

The enforcement of the memory limit is done in a different way for producers as
compared to consumers, although both will share a common instance of a
`MemoryLimitController`, which is unique per each `PulsarClient` instance.

```java
interface MemoryLimitController {

    // Non-blocking
    boolean tryReserveMemory(long size);

    // Blocks until memory is available
    void reserveMemory(long size) throws InterruptedException;

    void releaseMemory(long size);
}
```

The implementation of `MemoryLimitController` will be optimized to avoid
contention across multiple threads trying to reserve and release memory.

### Producer

If the memory limit is set, the producer will first try to acquire the
semaphore that is currently set based on the producer queue size, then it
will try to reserve the memory, either in blocking or non-blocking way,
depending on the producer configuration.

### Consumer

The logic for consumers is slightly more complicated because a client needs to
give permits to brokers to push messages. Right now, the count is done in terms
of messages and it cannot simply switch to "bytes" because a consumer will
need to consume from multiple topics and partitions.

Because of that, we need to invert the order, by using the memory limit
controller as a wait to interact with the flow control, reserving memory
after we have already received the messages.

The proposal is to keep the receiver queue size as a configuration, although
treat it more as the "max receiver queue size".

When a consumer is created, and memory limit is set, it will use these
parameters:
 * `maxReceiverQueueSize`: the values configured by the application
 * `currentReceiverQueue`: a dynamic limit that will be auto adjusted, starting
    from an initial value (eg: 1) up to the max.

The goal is to step up the `currentReceiverQueue` if and only if:
 1. Doing it would improve throughput
 2. We have enough memory to do it


#### Controlling throughput

The rationale for `(1)` is that increasing `currentReceiverQueue` increases the
amount of pre-fetched messages for the consumer. The goal of the prefetch queue
is to make sure an application calling `receive()` will never be
blocked waiting,
if there are messages available.

Therefore, we can determine that the `currentReceiverQueue` is limiting the
throughput if:

 1. Application calls receive and there are no pre-fetched messages
 2. And we know that there are messages pending to be sent for this consumer.
    For this, in case of exclusive/failover subscriptions, we can rely on the
    last message id, while for other subscription types, we need to introduce
    using a new special command to brokers that will tell if broker waiting on
    consumer for more permits.

If these conditions are true, we can increase the `currentReceiverQueue` to
increase the load.

With this mechanism, we can start with a very small window and expand as needed,
maintaining the minimum size that is able to sustain the requested throughput.

#### Limiting consumer memory

In order to limit the consumer memory, we would only increase the
`currentReceiverQueue` if the memory usage is below a certain
threshold (eg: 75%).
Also, if the usage reaches a higher threshold, (eg: 95%) we will reduce the
`currentReceiverQueue` for all the consumers.



--
Matteo Merli
<mm...@apache.org>

Re: [PROPOSAL] PIP 74: Pulsar client memory limits

Posted by Matteo Merli <ma...@gmail.com>.
On Thu, Dec 17, 2020 at 9:57 PM Yuto Furuta <yf...@yahoo-corp.jp> wrote:

> 1. Why are producer queue size related settings deprecated/ignored?
> It may be true that it is easy for users to set memory sizes than queue sizes.
> However, there may be some users who have already set appropriate values.
> So, I think these settings don't need to be deprecated/ignored.

Ok, instead of deprecating, we could instead turn it off by default
(once the memory limit is considered stable)
and only use it (in addition to memory limit) if the application sets it.

> 2. Can producer queue size be changed dynamically?
It's not possible today because the queue size control is based on a
semaphore, which cannot be updated.

Re: [PROPOSAL] PIP 74: Pulsar client memory limits

Posted by Yuto Furuta <yf...@yahoo-corp.jp>.
Matteo,

Thank you for your proposal.
I have two questions about it.

1. Why are producer queue size related settings deprecated/ignored?
It may be true that it is easy for users to set memory sizes than queue sizes.
However, there may be some users who have already set appropriate values.
So, I think these settings don't need to be deprecated/ignored.

2. Can producer queue size be changed dynamically?

Best regards,

Yuto
--
Yuto Furuta
yfuruta@yahoo-corp.jp


________________________________
差出人: Matteo Merli <mm...@apache.org>
送信日時: 2020年12月16日 5:20
宛先: Dev <de...@pulsar.apache.org>
件名: [PROPOSAL] PIP 74: Pulsar client memory limits

https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits


## Motivation

Currently, there are multiple settings in producers and consumers that allow
to control the sizes of the message queues. These are ultimately used to
control the amount of memory used by the Pulsar client.

There are few issues with the current approach, that makes it complicated, in
non-trivial scenarios, to select an overall configuration that leads to a
certain use of memory:

 1. The settings are in terms of "number of messages", so one has to adjust
    based on the expected message size.

 2. The settings are per producer/consumer. If an application has a large
    (or simply unknown) number number of producers/consumer, it's very difficult
    to select an appropriate value for queue sizes. The same is true for topics
    that have many partitions.

## Goal

Allow to specify a maximum amount of memory on a given Pulsar client. The
producers and consumers will compete for the memory assigned.

The scope of this proposal is to track the "direct" memory used to hold
outgoing or incoming message payloads. The usage of JVM heap memory is outside
the scope, though normally it only represents a small fraction of the payloads
size.

The change will be done in multiple steps:
 1. Introduce new API to set the memory limit, keeping it disabled by default
  1.1 Implement memory limit for producers
 2. Implement memory limit for consumers
 3. (Once the feature is stable)
    Enable memory limit by default, with 64 MB
 4. Deprecate producer queue size related settings
 5. Ignore producer queue size related settings

## Client API

```java

public class ClientBuilder {
  // ....

  /**
   * Configure a limit on the amount of direct memory that will be
allocated by this client instance.
   * <p>
   * Setting this to 0 will disable the limit.
   *
   * @param memoryLimit
   *            the limit
   * @param unit
   *            the memory limit size unit
   * @return the client builder instance
   */
  ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
}
```

For example:

```java
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .memoryLimit(64, SizeUnit.MEGA_BYTES)
    .create();
```

The same logic described here will be implemented first in Java and then
in the rest of supported client libraries.

## Implementation

The enforcement of the memory limit is done in a different way for producers as
compared to consumers, although both will share a common instance of a
`MemoryLimitController`, which is unique per each `PulsarClient` instance.

```java
interface MemoryLimitController {

    // Non-blocking
    boolean tryReserveMemory(long size);

    // Blocks until memory is available
    void reserveMemory(long size) throws InterruptedException;

    void releaseMemory(long size);
}
```

The implementation of `MemoryLimitController` will be optimized to avoid
contention across multiple threads trying to reserve and release memory.

### Producer

If the memory limit is set, the producer will first try to acquire the
semaphore that is currently set based on the producer queue size, then it
will try to reserve the memory, either in blocking or non-blocking way,
depending on the producer configuration.

### Consumer

The logic for consumers is slightly more complicated because a client needs to
give permits to brokers to push messages. Right now, the count is done in terms
of messages and it cannot simply switch to "bytes" because a consumer will
need to consume from multiple topics and partitions.

Because of that, we need to invert the order, by using the memory limit
controller as a wait to interact with the flow control, reserving memory
after we have already received the messages.

The proposal is to keep the receiver queue size as a configuration, although
treat it more as the "max receiver queue size".

When a consumer is created, and memory limit is set, it will use these
parameters:
 * `maxReceiverQueueSize`: the values configured by the application
 * `currentReceiverQueue`: a dynamic limit that will be auto adjusted, starting
    from an initial value (eg: 1) up to the max.

The goal is to step up the `currentReceiverQueue` if and only if:
 1. Doing it would improve throughput
 2. We have enough memory to do it


#### Controlling throughput

The rationale for `(1)` is that increasing `currentReceiverQueue` increases the
amount of pre-fetched messages for the consumer. The goal of the prefetch queue
is to make sure an application calling `receive()` will never be
blocked waiting,
if there are messages available.

Therefore, we can determine that the `currentReceiverQueue` is limiting the
throughput if:

 1. Application calls receive and there are no pre-fetched messages
 2. And we know that there are messages pending to be sent for this consumer.
    For this, in case of exclusive/failover subscriptions, we can rely on the
    last message id, while for other subscription types, we need to introduce
    using a new special command to brokers that will tell if broker waiting on
    consumer for more permits.

If these conditions are true, we can increase the `currentReceiverQueue` to
increase the load.

With this mechanism, we can start with a very small window and expand as needed,
maintaining the minimum size that is able to sustain the requested throughput.

#### Limiting consumer memory

In order to limit the consumer memory, we would only increase the
`currentReceiverQueue` if the memory usage is below a certain
threshold (eg: 75%).
Also, if the usage reaches a higher threshold, (eg: 95%) we will reduce the
`currentReceiverQueue` for all the consumers.



--
Matteo Merli
<mm...@apache.org>