You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Matteo Merli <mm...@apache.org> on 2019/10/09 18:35:35 UTC

[DISCUSS] PIP-45 - Pluggable metadata interface

https://github.com/apache/pulsar/wiki/PIP-45:-Pluggable-metadata-interface

# PIP-45 - Pluggable metadata interface

## Goals

Provide a unified pluggable interface that can abstract all the Pulsar metadata
interactions.

After the refactoring, the default implementation will still be based
on ZooKeeper and it will
be 100% compatible with the existing metadata. The metadata will be
kept in the same location
and in the same exact format.

Once we have the interface defined we could have multiple backend
implementations:
  * ZooKeeper
  * Etcd
  * In memory - for unit tests purposes
  * On local disk - for usage in Pulsar standalone

## Context

Pulsar is currently using ZooKeeper for metadata and coordination
purposes. These accesses are
being done from Pulsar brokers and some administrative CLI tools.
BookKeeper already supports
a pluggable metadata store.

Additionally, ZooKeeper client API is being accessed from several
places in the codebase, so we
first need to consolidate all these accesses through a single generic
`MetadataStore` interface.

This interface is based on the needs that Pulsar has in interacting
with metadata and with the
semantics offered by existing metadata stores (eg. ZooKeeper, Etcd and
others). The API will be
considered as "Beta" (meaning it could be evolved in breaking way)
until we have at least few
concrete implementations. Therefore, at least initially, this will be
an internal Pulsar API and
it will not be open to user plugins.

## Refactoring steps

### 1. Define metadata store API

The metadata store is modeled after a basic Key-Value interface with
`compareAndSet()` updates
based on the version of a particular value.

```java
public interface MetadataStore extends AutoCloseable {

    /**
     * Read the value of one key, identified by the path
     *
     * The async call will return a future that yields a {@link
GetResult} that will contain the value and the
     * associated {@link Stat} object.
     *
     * If the value is not found, the future will yield an empty
{@link Optional}.
     *
     * @param path
     *            the path of the key to get from the store
     * @return a future to track the async request
     */
    CompletableFuture<Optional<GetResult>> get(String path);

    /**
     * Return all the nodes (lexicographically sorted) that are
children to the specific path.
     *
     * If the path itself does not exist, it will return an empty list.
     *
     * @param path
     *            the path of the key to check on the store
     * @return a future to track the async request
     */
    CompletableFuture<List<String>> getChildren(String path);

    /**
     * Read whether a specific path exists.
     *
     * Note: In case of keys with multiple levels (eg: '/a/b/c'),
checking the existence of a parent (eg. '/a') might
     * not necessarily return true, unless the key had been explicitly created.
     *
     * @param path
     *            the path of the key to check on the store
     * @return a future to track the async request
     */
    CompletableFuture<Boolean> exists(String path);

    /**
     * Put a new value for a given key.
     *
     * The caller can specify an expected version to be atomically
checked against the current version of the stored
     * data.
     *
     * The future will return the {@link Stat} object associated with
the newly inserted value.
     *
     *
     * @param path
     *            the path of the key to delete from the store
     * @param value
     *            the value to
     * @param expectedVersion
     *            if present, the version will have to match with the
currently stored value for the operation to
     *            succeed. Use -1 to enforce a non-existing value.
     * @throws BadVersionException
     *             if the expected version doesn't match the actual
version of the data
     * @return a future to track the async request
     */
    CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion);

    /**
     *
     * @param path
     *            the path of the key to delete from the store
     * @param expectedVersion
     *            if present, the version will have to match with the
currently stored value for the operation to
     *            succeed
     * @throws NotFoundException
     *             if the path is not found
     * @throws BadVersionException
     *             if the expected version doesn't match the actual
version of the data
     * @return a future to track the async request
     */
    CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion);
}
```

Additionally, when the `MetadataStore` is created, it should be
possible to specify an observer
function that will be triggered whenever there are changes on a
sub-tree of the specified keys.

This will be used to keep local caches updated without any polling.

### 2. Define coordination interface

Pulsar broker uses "coordination" in several different places. Examples are:

 * List of active brokers and their current load data report
 * Acquire ownership on a portion of a namespace topics (bundle)
 * Leader election (load manager)
 * Counters for generating unique prefix identifier

While in general these can be implemented through a Key-Value
interface with the help of
flags (eg. "ephemeral" nodes in ZooKeeper), each backend system might
have a more direct way
to implement these.

```java
/**
 * Interface for the coordination service. Provides abstraction for
distributed locks and leader election.
 */
public interface CoordinationService extends AutoCloseable {

    /**
     * Read the content of an existing lock.
     *
     * If the lock is already taken, this operation will fail immediately.
     *
     * Warning: because of the distributed nature of the lock, having
acquired a lock will never provide a strong
     * guarantee that no one else also think it owns the same
resource. The caller will have to deal with these race
     * conditions when using the resource itself (eg. using
compareAndSet() or fencing mechanisms).
     *
     * @param path
     *            the path of the resource on which to acquire the lock
     * @param content
     *            the payload of the lock
     * @return a future that will track the completion of the operation
     * @throws NotFoundException
     *             if the lock is not taken
     * @throws CoordinationServiceException
     *             if there's a failure in reading the lock
     */
    CompletableFuture<Optional<byte[]>> readLock(String path);

    /**
     * Acquire a lock on a shared resource.
     *
     * If the lock is already taken, this operation will fail immediately.
     *
     * Warning: because of the distributed nature of the lock, having
acquired a lock will never provide a strong
     * guarantee that no one else also think it owns the same
resource. The caller will have to deal with these race
     * conditions when using the resource itself (eg. using
compareAndSet() or fencing mechanisms).
     *
     * @param path
     *            the path of the resource on which to acquire the lock
     * @param content
     *            the payload of the lock
     * @return a future that will track the completion of the operation
     * @throws ResourceBusyException
     *             if the lock is already taken
     * @throws CoordinationServiceException
     *             if there's a failure in acquiring the lock
     */
    CompletableFuture<ResourceLock> acquireLock(String path, byte[] content);

    /**
     * List all the locks that are children of a specific path.
     *
     * For example, given locks: <code>/a/b/lock-1</code> and
<code>/a/b/lock-2</code>, the <code>listLocks()</code>
     * will return a list of <code>["lock-1", "lock-2"]</code>.
     *
     * @param path
     *            the prefix path to get the list of locks
     * @return a future that will track the completion of the operation
     * @throws CoordinationServiceException
     *             if there's a failure in getting the list of locks
     */
    CompletableFuture<List<String>> listLocks(String path);

    /**
     * Try to become the leader for the specified resource.
     *
     * If there's already a leader, this request will be kept pending
the current process is the one to become the
     * leader.
     *
     *
     * Warning: because of the distributed nature of the leader
election, having been promoted to "leader" status will
     * never provide a strong guarantee that no one else also thinks
it's the leader. The caller will have to deal with
     * these race conditions when using the resource itself (eg. using
compareAndSet() or fencing mechanisms).
     *
     * @param path
     *            the path of the resource of which to become the leader
     * @param content
     *            the payload of the lock
     * @return a future that will track the completion of the operation
     * @throws CoordinationServiceException
     *             if there's a failure in the leader election
     */
    CompletableFuture<ResourceLock> becomeLeader(String path, byte[] content);

    /**
     * Increment a counter identified by the specified path and return
the current value.
     *
     * The counter value will be guaranteed to be unique within the
context of the path.
     *
     * @param path
     *            the path that identifies a particular counter
     * @return a future that will track the completion of the operation
     * @throws CoordinationServiceException
     *             if there's a failure in incrementing the counter
     */
    CompletableFuture<Long> getNextCounterValue(String path);
}


/**
 * Represent a lock that the current process has on a shared resource.
 */
public interface ResourceLock {

    /**
     * @return the content associated with the lock
     */
    byte[] getContent();

    /**
     * Release the lock on the resource.
     *
     * @return a future to track when the release operation is complete
     */
    CompletableFuture<Void> release();

    /**
     * Get a future that can be used to get notified when the lock is
not more valid.
     *
     * Note: the future will not be triggered when the lock is
voluntarily released.
     *
     * @return a future to get notification if the lock is expired
     */
    CompletableFuture<Void> getLockExpiredFuture();
}
```

### 3. Port ManagedLedger to use MetadataStore

ManagedLedger is already using an abstraction for metadata access (see
`MetaStore`). It will be
easy to convert that to the `MetadataStore` API.

### 4. Define metadata cache API

Currently, most metadata read accesses are happening through the
`ZooKeeperCache` and additional
classes based on it, like `ZooKeeperDataCache` and `ZooKeeperChildrenCache`.

The cache needs to be ported to use `MetadataStore` API, along with
the support for receiving
notifications and invalidating the stale entries.

An additional concept that should be added to the cache is the atomic
"read-modify-update" operation.

This is currently being performed from many places in the code base
and it should be consolidated
into a single implementation. For example:

```java
public interface TypedMetadataCache<T> {
    // ...

    /**
     * Perform an atomic read-modify-update of the value.
     *
     * The modify function can potentially be called multiple times if
there are concurrent updates happening.
     *
     * @param path
     *            the path of the value
     * @param modifyFunction
     *            a function that will be passed the current value and
returns a modified value to be stored
     * @return a future to track the completion of the operation
     */
    CompletableFuture<Void> readModifyUpdate(String path, Function<T,
T> modifyFunction);
}
```

### 5. Define higher level abstraction for metadata

While the ZooKeeperCache is already typed (in order to store the
object already deserialized), we
should have an additional abstraction layer to mediate the access to
the metadata.

For example, to get the list of tenants in a cluster, we shouldn't use
`MetadataStore.getChildren()`
directly from multiple places. Rather, we need to provide a
`ConfigurationStore` interface such as:

```java
public interface ConfigurationStore {
    CompletableFuture<List<String>> getTenants();

    CompletableFuture<List<String>> getNamespaces(String tenant);

    // ....
}
```

--
Matteo Merli
<mm...@apache.org>