You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Matteo Merli <mm...@apache.org> on 2020/07/14 06:25:19 UTC

[DISCUSSION] PIP 68: Exclusive Producer

https://github.com/apache/pulsar/wiki/PIP-68:-Exclusive-Producer


## Goal

Allow applications to require exclusive producer access on a topic in order to
achieve a "single-writer" situation.

There are several use cases that require exclusive access for a single writer,
of which few examples are:

 * Ensuring a linear non-interleaved history of messages
 * Providing basic mechanism for leader election

From Pulsar perspective, this feature needs to provide 2 fundamental properties:

 1. Guaranteed single-writer - There should be 1 single writer in any
combination
    of errors.
 2. Producer fencing - Once a producer looses the exclusive access, no more
    messages from that producer can be published on the topic.

One example of concrete use case for this feature is in the Pulsar Functions
metadata controller. In order to write a single linear history of all the
functions metadata updates, it requires to elect 1 leader and that all the
"decisions" made by this leader be written on the topic.

If we assume single-writer and fencing, we can thus guarantee that the topic
will contain different segments of updates, one per each successive leader, and
that there will be no interleaving across different leaders.

## Client API

An application can require exclusive producer access on a topics by configuring
it on the producer builder.

```java
/**
 * The type of access to the topic that the producer requires.
 */
public enum ProducerAccessMode {
    /**
     * By default multiple producers can publish on a topic.
     */
    Shared,

    /**
     * Require exclusive access for producer. Fail immediately if there's
     * already a producer connected.
     */
    Exclusive,

    /**
     * Producer creation is pending until it can acquire exclusive access.
     */
    WaitForExclusive,
}

public interface ProducerBuilder<T> {
  // .....

  /**
   * Configure the type of access mode that the producer requires on the topic.
   *
   * <p>Possible values are:
   * <ul>
   * <li>{@link ProducerAccessMode#Shared}: By default multiple producers can
   *     publish on a topic
   * <li>{@link ProducerAccessMode#Exclusive}: Require exclusive access for
   *     producer. Fail immediately if there's already a producer connected.
   * <li>{@link ProducerAccessMode#WaitForExclusive}: Producer creation is
   *       pending until it can acquire exclusive access
   * </ul>
   *
   * @see ProducerAccessMode
   * @param accessMode
   *            The type of access to the topic that the producer requires
   * @return the producer builder instance
   */
  ProducerBuilder<T> accessMode(ProducerAccessMode accessMode);
}
```

For example:

```java

Producer<String> producer = client.newProducer(Schema.STRING)
      .topic("my-topic")
      .accessMode(ProducerAccessMode.Exclusive)
      .create();
```

## Semantic

Once an application is successful in creating a producer with "exclusive"
access, either using `Exclusive` or `WaitForExclusive` mode, the particular
instance of that application is guaranteed to be only writer on that topic.

Other producers trying to produce on this topic, will either receive an error
immediately (if using `Exclusive` or `Shared` access mode) or will have to
wait until the current exclusive producer session is closed.

The application holding the "exclusive" access right to the topic can also
lose this right. This will happen for example if the producer experiences a
network partition with the broker. In this case, this producer will be evicted
and a new producer will be picked to be the next exclusive producer.

The application that has lost the exclusive producer access, will not be
immediately notified of that event, but it's guaranteed to receive a
`ProducerFencedException` the next time it attempts to publish a message on
the topic.

Once the application receives `ProducerFencedException`, the producer instance
is permanently fenced off. To restart producing on the topic, it will be
necessary to create a new instance.

### WaitForExclusive

`WaitForExclusive` access mode is very similar as `Exclusive` with just a
difference on what happens at the producer creation.

 * `Exclusive` : Try to become the only producer and fail immediately if there's
   already another producer connected.
 * `WaitForExclusive` : Same as before but, if there are producers connected,
   suspend the producer creation until we can be promoted to be exclusive
   producer.

In practice, the `ProducerBuilder.create()` will be blocking until we are
promoted to exclusive producer.

This access mode makes it very simple for applications to implement a leader
election scheme, in which the producers that succeeds in becoming the exclusive
producer is treated as the "leader".

## Implementation

A new concept of "topic epoch" is introduced for this feature. The epoch is
a counter that is incremented each time a new exclusive producer takes over
the topic.

The epoch is stored in metadata as part of ManagedLedger properties and it has
to be updated each time a new producer attempts to take over.

The `CommandProducer` protobuf command will be extended with these new fields:

```protobuf
    // Require that this producers will be the only producer allowed
on the topic
    optional ProducerAccessMode producer_access_mode = 10 [default = Shared];

    // Topic epoch is used to fence off producers that reconnects after a new
    // exclusive producer has already taken over. This id is assigned by the
    // broker on the CommandProducerSuccess. The first time, the client will
    // leave it empty and then it will always carry the same epoch number on
    // the subsequent reconnections.
    optional uint64 topic_epoch = 11;
```

When a new producer tries to create a session for the 1st time, it will be
leaving the `topic_epoch` empty.

On `CommandProducerSuccess`, the broker will inform the exclusive producer of
it's associated epoch:

```protobuf

    // The topic epoch assigned by the broker. This field will only be set if we
    // were requiring exclusive access when creating the producer.
    optional uint64 topic_epoch = 5;
```

From that point on, the producer instance will always be associated with that
particular topic epoch.

When a producer reconnects, it will pass the existing topic epoch and if it's
lower than the current topic epoch that the broker has, the producer will be
considered "fenced" permanently.

That will ensure that no inflight message from this fenced producer will ever
make its way in the topic.

### Failure detection

The broker owning a particular topic is the one that detect that the current
exclusive producer has failed.

The producer "ownership" of the topic is immediately removed when either:
 1. The client sends a CloseProducer command, for a graceful close
 2. The TCP connection for the producer is closed (perhaps due to the internal
    keepalive probes).

In both cases, the topic is now up for grab for a new producer.

There are 2 scenarios:
 1. The old producer is the first in re-attempting to produce on the topic.
    In this case, the topic epoch will still match the producer epoch and the
    topic epoch doesn't need to get incremented.

 2. A new producer is the first in reconnecting and it will force the
    increment on the topic epoch.


## Example of usage

A typical scenario in which the exclusive producer is useful is to maintain
a consistent state through a compacted topic.

Application will use that to elect a "leader" through which all the updates
are written on the Pulsar topic.

When the producer becomes the exclusive writer on the topic, it will also
need to make to have applied all the update from the previous producers.

Example:


```java
Map<String, T> myState = new HashMap<>();

Producer<T> producer = client.newProducer(Schema.JSON(T.class))
      .topic("my-topic")
      .accessMode(ProducerAccessMode.WaitForExclusive)
      .create();

Reader<T> reader = client.newReader(Schema.JSON(T.class))
       .topic("my-topic")
       .readCompacted(true)
       .create();

while (reader.hasMessageAvailable()) {
  Message<T> msg = reader.readNext();
  myState.put(msg.getKey(), msg.getValue());
}
```

Done. We're now ready to use the producer as a write-ahead-log for the
updates on the state

```java
void write(String key, T value) {
  producer.newMessage().key(key).value(value).send();
  myState.put(key, value);
}
```


--
Matteo Merli
<mm...@apache.org>