You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Enrico Olivelli <eo...@gmail.com> on 2021/11/15 12:06:08 UTC

Re: [DISCUSSION] PIP-104: Add new consumer type: TableView

Matteo,
sorry late reply

Il giorno gio 14 ott 2021 alle ore 01:40 Matteo Merli <
matteo.merli@gmail.com> ha scritto:

> https://github.com/apache/pulsar/issues/12356
>
> --- Pasted here for quoting convenience ---
>
>
>
>
>
> ## Motivation
>
> In many use cases, applications are using Pulsar consumers or readers to
> fetch
> all the updates from a topic and construct a map with the latest value of
> each
> key for the messages that were received. This is very common when
> constructing
> a local cache of the data.
>
> We want to offer support for this access pattern directly in the Pulsar
> client
> API, as a way to encapsulate the complexities of setting this up.
>
>
> ## Goal
>
> Provide a view of the topic data in the form of a read-only map that is
> constantly updated with the latest version of each key.
>
> Additionally, let the application specify a listener so that it can perform
> a scan of the map and then receive notifications when new messages are
> received and applied.
>
> ## API Changes
>
> This proposal will only add a new API on the client side.
>
> A new type of consumer will be added, the `TableView`.
>
> Example:
>
> ```java
> TableView<Integer> tableView = pulsarClient.newTableView(Schema.INT32)
>     .topic(topic)
>     .create();
>
> tableView.get("my-key"); // --> 5
> tableView.get("my-other-key"); // --> 7
> ```
>
> When a `TableView` instance is created, it will be guaranteed to already
> have
> the latest value for each key, for the current time.
>
> ### API additions
>
> ```java
> interface PulsarClient {
>     // ....
>     <T> TableViewBuilder<T> newTableView(Schema<T> schema);
> }
>
> interface TableViewBuilder<T> {
>     TableViewBuilder<T> loadConf(Map<String, Object> config);
>     TableView<T> create() throws PulsarClientException;
>     CompletableFuture<TableView<T>> createAsync();
>     TableViewBuilder<T> topic(String topic);
>     TableViewBuilder<T> autoUpdatePartitionsInterval(int interval,
> TimeUnit unit);
> }
>
> interface TableView<T> extends Closeable {
>
>     // Similar methods as java.util.Map
>     int size();
>     boolean isEmpty();
>     boolean containsKey(String key);
>     T get(String key);
>     Set<Map.Entry<String, T>> entrySet();
>     Set<String> keySet();
>     Collection<T> values();
>     void forEach(BiConsumer<String, T> action);
>
>     /**
>      * Performs the given action for each entry in this map until all
> entries
>      * have been processed or the action throws an exception.
>      *
>      * When all the entries have been processed, the action will be invoked
>      * for every new update that is received from the topic.
>      *
>      * @param action The action to be performed for each entry
>      */
>     void forEachAndListen(BiConsumer<String, T> action);
>
>     /**
>      * Close the table view and releases resources allocated.
>      *
>      * @return a future that can used to track when the table view has
> been closed
>      */
>     CompletableFuture<Void> closeAsync();
> }
> ```
>
> ## Implementation
>
> The `TableView` will be implemented using multiple `Reader` instances, one
> per each partition and will always specify to read starting from the
> compacted
> view of the topic.
>
> The creation time of a table view can be controlled by configuring the
> topic compaction policies for the given topic or namespace. More frequent
> compaction can lead to very short startup times, as in less data will be
> replayed to reconstruct the `TableView` of the topic.
>


I think that this feature will add value to the Pulsar.

Only one point.
Do we need to add this to the Pulsar Client API or can we just simply add
this
as an additional library, maintained in pulsar-adapters ?

Adding this to the Pulsar Client will allow users to discover this feature
more easily
but on the other side it would make sense to add this there if we need some
additional support.

I am thinking about doing the same way, that is to store the code in
pulsar-adapters and not in the main Pulsar repo  for my other proposal
about shared State Objects (I will give a PIP name to it soon :-) ) .



Enrico



>
>
>
> --
> Matteo Merli
> <ma...@gmail.com>
>