You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by dezhi liu <li...@gmail.com> on 2020/05/07 12:23:19 UTC

ReadOnly Topic Ownership Support

Hi all,
Here is a suggest (PIP) ReadOnly Topic Ownership Support
------------
# PIP-63: ReadOnly Topic Ownership Support

* Author: Penghui LI, Jia Zhai, Sijie Guo, Dezhi Liu

## Motivation
People usually use Pulsar as an event-bus or event center to unify all
their message data or event data.
One same set of event data will usually be shared across multiple
applications. Problems occur when the number of subscriptions of same topic
increased.

- The bandwidth of a broker limits the number of subscriptions for a single
topic.
- Subscriptions are competing the network bandwidth on brokers. Different
subscription might have different level of severity.
- When synchronizing cross-city message reading, cross-city access needs to
be minimized.

This proposal is proposing adding readonly topic ownership support. If
Pulsar supports readonly ownership, users can then use it to setup a (few)
separated broker clusters for readonly, to segregate the consumption
traffic by their service severity. And this would also allow Pulsar
supporting large number of subscriptions.

## Changes
There are a few key changes for supporting readonly topic ownership.

- how does readonly topic owner read data
- how does readonly topic owner keep metadata in-sync
- how does readonly topic owner handle acknowledges

The first two problems have been well addressed in DistributedLog. We can
just add similar features in managed ledger.

### How readonly topic owner read data

In order for a readonly topic owner keep reading data in a streaming way,
the managed ledger should be able to refresh its LAC.  The easiest change
is to call `readLastAddConfirmedAsync` when a cursor requests entries
beyond existing LAC. A more advanced approach is to switch the regular read
entries request to bookkeeper’s long poll read requests. However long poll
read requests are not support in the bookkeeper v2 protocol.

Required Changes:

- Refresh LastAddConfirmed when a managed cursor requests entries beyond
known LAC.
- Enable `explicitLac` at managed ledger. So the topic writable owner will
periodically advance LAC, which will make sure readonly owner will be able
to catch with the latest data.

### How readonly topic owner keep metadata in-sync

Ledgers are rolled at a given interval. Readonly topic owner should find a
way to know the ledgers has been rolled. There are a couple of options.
These options are categorized into two approaches : notification vs polling.

*Notification*

A) use zookeeper watcher. Readonly topic owner will set a watcher at the
managed ledger’s metadata. So it will be notified when a ledger is rolled.
B) similar as A), introduce a “notification” request between readonly topic
owner and writable topic owner. Writable topic owner notifies readonly
topic owner with metadata changes.

*Polling*

C) Readonly Broker polling zookeeper to see if there is new metadata,
*only* when LAC in the last ledger has not been advanced for a given
interval. Readonly Broker checks zookeeper to see if there is a new ledger
rolled.
D)Readonly Broker polling new metadata by read events from system topic of
write broker cluster, write broker add the ledger meta change events to the
system topic when mledger metadata update.

Solution C) will be the simplest solution to start with

### How does readonly topic owner handle acknowledges

Currently Pulsar deploys a centralized solution for managing cursors and
use cursors for managing data retention. This PIP will not change this
solution. Instead, readonly topic owner will only maintains a cursor cache,
all the actual cursor updates will be sent back to the writable topic
owner.

This requires introducing a set of “cursor” related RPCs between writable
topic owner and readonly topic owners.

- Read `Cursor` of a Subscription

So readonly topic owner will handle following requests using these new
cursor RPCs

- Subscribe : forward the subscribe request to writable topic owner. Upon
successfully subscribe, readonly topic owner caches the corresponding
cursor.
- Unsubscribe: remove cursor from cursor cache, and forward the unsubscribe
request to writable topic owner.
- Consume: when a consumer is connected, it will then `read` the cursor
from writable topic owner and cache it locally.
- Ack: forward the ack request to the writable topic owner, and update the
cursor locally in the cache.

## Compatibility, Deprecation and Migration Plan
Since most of the changes are internally changes to managed ledger, and it
is a new feature which doesn’t change pulsar’s wire protocol and public
api. There is no backward compatibility  issue.

It is a newly added feature. So there is nothing to deprecate or migrate.

## Test Plan
- Unit tests for each individual change
- Integration tests for end-to-end pipeline
- Chaos testing to ensure correctness
- Load testing for ensuring performance

## Rejected Alternatives
### Use Geo Replication to replicate data between clusters

A simplest alternative solution would be using Pulsar’s built-in
geo-replication mechanism to replicate data from one cluster to the other
cluster.

#### Two completely separated clusters

The idea is pretty straightforward - You created two separated clusters,
one cluster is for your online services -  `Cluster-A`, while the other
cluster is for your analytical workloads - `Cluster-B`.  `ClusterA` is used
for serving both write (produce) and read (consume) traffic, while
`ClusterB` is used for serving readonly (consume) traffic. Both `Cluster-A`
and `Cluster-B` have their own zookeeper cluster, bookkeeper cluster, and
brokers. In order to make sure a topic’s data can be replicated between
`Cluster-A` and `Cluster-B`, we need do make sure `Cluster-A` and
`Cluster-B` sharing same configuration storage. There are two approaches to
do so:

a) a completely separated zookeeper cluster as configuration storage.

In this approach, everything is completely separated. So you can treat
these two clusters just as two different regions, and follow the
instructions in [Pulsar geo-replication · Apache Pulsar](
http://pulsar.apache.org/docs/en/administration-geo/) to setup data
replication between these two clusters.

b) `ClusterB` and `ClusterA` share same configuration storage.

The approach in a) requires setting up a separate zookeeper cluster as
configuration storage. But since `ClusterA` and `ClusterB` already have
their own zookeeper clusters, you don’t want to setup another zookeeper
cluster. You can let both `ClusterA` and `ClusterB` use `ClusterA`’s
zookeeper cluster as the configuration store. You can achieve it using
zookeeper’s chroot mechanism to put configuration data in a separate root
in `ClusterA`’s zookeeper cluster.

For example:

- Command to initialize `ClusterA`’s metadata

```
$ bin/pulsar initialize-cluster-metadata \
  --cluster ClusterA \
  --zookeeper zookeeper.cluster-a.example.com:2181 \
  --configuration-store
zookeeper.cluster-a.example.com:2181/configuration-store \
  --web-service-url http://broker.cluster-a.example.com:8080/ \
  --broker-service-url pulsar://broker.cluster-a.example.com:6650/
```

- Command to initialize `ClusterB`’s metadata
```
$ bin/pulsar initialize-cluster-metadata \
  --cluster ClusterB \
  --zookeeper zookeeper.cluster-b.example.com:2181 \
  --configuration-store
zookeeper.cluster-a.example.com:2181/configuration-store \
  --web-service-url http://broker.cluster-b.example.com:8080/ \
  --broker-service-url pulsar://broker.cluster-b.example.com:6650/
```

#### Shared bookkeeper and zookeeper cluster, but separated brokers

Sometimes it is unaffordable to have two completely separated clusters. You
might want to share the existing infrastructures, such as data storage
(bookkeeper) and metadata storage (zookeeper). Similar as the b) solution
described above, you can use zookeeper chroot to achieve that.

Let’s assume there is only one zookeeper cluster and one bookkeeper
cluster. The zookeeper cluster is `zookeeper.shared.example.com:2181`.
You have two clusters of brokers, one cluster of broker is `
broker-a.example.com`, and the other broker cluster is `broker-b.example.com`.
So when you create the clusters, you can use `
zookeeper.shared.example.com:2181/configuration-store` as the shared
configuration storage, and use `
zookeeper.shared.example.com:2181/cluster-a`for `ClusterA`’s local metadata
storage, and use `zookeeper.shared.example.com:2181/cluster-b` for
`ClusterB`’s local metadata storage.

This would allows you have two “broker-separated” clusters sharing same
storage cluster (both zookeeper and bookkeeper).

No matter how the physical clusters are setup, there is a downside of using
geo-replications for isolating the online workloads and analytics workloads
- data has to be replicated at least twice, if you have configured pulsar
topics to store data in 3 replicas, you will end up have at least 6 copies
of data. So “geo-replication” might not be ideal for addressing this use
case.

------------


Thanks,

Dezhi

Re: ReadOnly Topic Ownership Support

Posted by Sijie Guo <gu...@gmail.com>.
If you have a topic that has high fanout of subscriptions, the current
workaround is to increase the number of partitions. So partitions can be
spreading across brokers. So you have more bandwidth to support fanout
subscriptions.

Bypass brokers to read directly from bookies is also another approach for
data processing & analytical applications. Presto integration does this
way. We are adding similar batch read support in Flink/Spark integrations
as well.

- Sijie

On Thu, May 21, 2020 at 6:10 PM Jason O'Gray <ja...@theograys.com> wrote:

> I'm curious what the current work around is: 1) a partitioned topic to help
> spread the load or 2) reading directly from the bookie?
>
> Best.
>
> On Thu, May 21, 2020, 2:29 PM Joe F <jo...@apache.org> wrote:
>
> > Very useful feature.
> >
> > I would like the proposers  to think just beyond scaling consumers. If
> done
> > right, this has the potential to open up a lot of use cases  in ML, where
> > you need to reprocess old/archived  data. Being able to spin up a
> read-only
> > broker, ( dedicated brokers that read from tiered storage, without
> > interfering with current flow of the stream)  is extremely valuable. With
> > small tweaks to this PIP, about data access boundaries,  and without lot
> of
> > additional complexity to this proposal, that  can be achieved
> >
> > On Tue, May 12, 2020 at 5:37 AM Jia Zhai <zh...@gmail.com> wrote:
> >
> > > 👍
> > >
> > > Best Regards.
> > >
> > >
> > > Jia Zhai
> > >
> > > Beijing, China
> > >
> > > Mobile: +86 15810491983
> > >
> > >
> > >
> > >
> > > On Fri, May 8, 2020 at 4:29 AM Sijie Guo <gu...@gmail.com> wrote:
> > >
> > > > Dezhi, thank you for sharing the proposal!
> > > >
> > > > It is great to see Tencent started contributing this great feature
> back
> > > to
> > > > Pulsar! This feature will unlock a lot of new capabilities of Pulsar.
> > > >
> > > > I have moved the proposal to
> > > >
> > > >
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-63:-Readonly-Topic-Ownership-Support
> > > >
> > > > - Sijie
> > > >
> > > >
> > > > On Thu, May 7, 2020 at 5:23 AM dezhi liu <li...@gmail.com>
> > wrote:
> > > >
> > > > > Hi all,
> > > > > Here is a suggest (PIP) ReadOnly Topic Ownership Support
> > > > > ------------
> > > > > # PIP-63: ReadOnly Topic Ownership Support
> > > > >
> > > > > * Author: Penghui LI, Jia Zhai, Sijie Guo, Dezhi Liu
> > > > >
> > > > > ## Motivation
> > > > > People usually use Pulsar as an event-bus or event center to unify
> > all
> > > > > their message data or event data.
> > > > > One same set of event data will usually be shared across multiple
> > > > > applications. Problems occur when the number of subscriptions of
> same
> > > > topic
> > > > > increased.
> > > > >
> > > > > - The bandwidth of a broker limits the number of subscriptions for
> a
> > > > single
> > > > > topic.
> > > > > - Subscriptions are competing the network bandwidth on brokers.
> > > Different
> > > > > subscription might have different level of severity.
> > > > > - When synchronizing cross-city message reading, cross-city access
> > > needs
> > > > to
> > > > > be minimized.
> > > > >
> > > > > This proposal is proposing adding readonly topic ownership support.
> > If
> > > > > Pulsar supports readonly ownership, users can then use it to setup
> a
> > > > (few)
> > > > > separated broker clusters for readonly, to segregate the
> consumption
> > > > > traffic by their service severity. And this would also allow Pulsar
> > > > > supporting large number of subscriptions.
> > > > >
> > > > > ## Changes
> > > > > There are a few key changes for supporting readonly topic
> ownership.
> > > > >
> > > > > - how does readonly topic owner read data
> > > > > - how does readonly topic owner keep metadata in-sync
> > > > > - how does readonly topic owner handle acknowledges
> > > > >
> > > > > The first two problems have been well addressed in DistributedLog.
> We
> > > can
> > > > > just add similar features in managed ledger.
> > > > >
> > > > > ### How readonly topic owner read data
> > > > >
> > > > > In order for a readonly topic owner keep reading data in a
> streaming
> > > way,
> > > > > the managed ledger should be able to refresh its LAC.  The easiest
> > > change
> > > > > is to call `readLastAddConfirmedAsync` when a cursor requests
> entries
> > > > > beyond existing LAC. A more advanced approach is to switch the
> > regular
> > > > read
> > > > > entries request to bookkeeper’s long poll read requests. However
> long
> > > > poll
> > > > > read requests are not support in the bookkeeper v2 protocol.
> > > > >
> > > > > Required Changes:
> > > > >
> > > > > - Refresh LastAddConfirmed when a managed cursor requests entries
> > > beyond
> > > > > known LAC.
> > > > > - Enable `explicitLac` at managed ledger. So the topic writable
> owner
> > > > will
> > > > > periodically advance LAC, which will make sure readonly owner will
> be
> > > > able
> > > > > to catch with the latest data.
> > > > >
> > > > > ### How readonly topic owner keep metadata in-sync
> > > > >
> > > > > Ledgers are rolled at a given interval. Readonly topic owner should
> > > find
> > > > a
> > > > > way to know the ledgers has been rolled. There are a couple of
> > options.
> > > > > These options are categorized into two approaches : notification vs
> > > > > polling.
> > > > >
> > > > > *Notification*
> > > > >
> > > > > A) use zookeeper watcher. Readonly topic owner will set a watcher
> at
> > > the
> > > > > managed ledger’s metadata. So it will be notified when a ledger is
> > > > rolled.
> > > > > B) similar as A), introduce a “notification” request between
> readonly
> > > > topic
> > > > > owner and writable topic owner. Writable topic owner notifies
> > readonly
> > > > > topic owner with metadata changes.
> > > > >
> > > > > *Polling*
> > > > >
> > > > > C) Readonly Broker polling zookeeper to see if there is new
> metadata,
> > > > > *only* when LAC in the last ledger has not been advanced for a
> given
> > > > > interval. Readonly Broker checks zookeeper to see if there is a new
> > > > ledger
> > > > > rolled.
> > > > > D)Readonly Broker polling new metadata by read events from system
> > topic
> > > > of
> > > > > write broker cluster, write broker add the ledger meta change
> events
> > to
> > > > the
> > > > > system topic when mledger metadata update.
> > > > >
> > > > > Solution C) will be the simplest solution to start with
> > > > >
> > > > > ### How does readonly topic owner handle acknowledges
> > > > >
> > > > > Currently Pulsar deploys a centralized solution for managing
> cursors
> > > and
> > > > > use cursors for managing data retention. This PIP will not change
> > this
> > > > > solution. Instead, readonly topic owner will only maintains a
> cursor
> > > > cache,
> > > > > all the actual cursor updates will be sent back to the writable
> topic
> > > > > owner.
> > > > >
> > > > > This requires introducing a set of “cursor” related RPCs between
> > > writable
> > > > > topic owner and readonly topic owners.
> > > > >
> > > > > - Read `Cursor` of a Subscription
> > > > >
> > > > > So readonly topic owner will handle following requests using these
> > new
> > > > > cursor RPCs
> > > > >
> > > > > - Subscribe : forward the subscribe request to writable topic
> owner.
> > > Upon
> > > > > successfully subscribe, readonly topic owner caches the
> corresponding
> > > > > cursor.
> > > > > - Unsubscribe: remove cursor from cursor cache, and forward the
> > > > unsubscribe
> > > > > request to writable topic owner.
> > > > > - Consume: when a consumer is connected, it will then `read` the
> > cursor
> > > > > from writable topic owner and cache it locally.
> > > > > - Ack: forward the ack request to the writable topic owner, and
> > update
> > > > the
> > > > > cursor locally in the cache.
> > > > >
> > > > > ## Compatibility, Deprecation and Migration Plan
> > > > > Since most of the changes are internally changes to managed ledger,
> > and
> > > > it
> > > > > is a new feature which doesn’t change pulsar’s wire protocol and
> > public
> > > > > api. There is no backward compatibility  issue.
> > > > >
> > > > > It is a newly added feature. So there is nothing to deprecate or
> > > migrate.
> > > > >
> > > > > ## Test Plan
> > > > > - Unit tests for each individual change
> > > > > - Integration tests for end-to-end pipeline
> > > > > - Chaos testing to ensure correctness
> > > > > - Load testing for ensuring performance
> > > > >
> > > > > ## Rejected Alternatives
> > > > > ### Use Geo Replication to replicate data between clusters
> > > > >
> > > > > A simplest alternative solution would be using Pulsar’s built-in
> > > > > geo-replication mechanism to replicate data from one cluster to the
> > > other
> > > > > cluster.
> > > > >
> > > > > #### Two completely separated clusters
> > > > >
> > > > > The idea is pretty straightforward - You created two separated
> > > clusters,
> > > > > one cluster is for your online services -  `Cluster-A`, while the
> > other
> > > > > cluster is for your analytical workloads - `Cluster-B`.  `ClusterA`
> > is
> > > > used
> > > > > for serving both write (produce) and read (consume) traffic, while
> > > > > `ClusterB` is used for serving readonly (consume) traffic. Both
> > > > `Cluster-A`
> > > > > and `Cluster-B` have their own zookeeper cluster, bookkeeper
> cluster,
> > > and
> > > > > brokers. In order to make sure a topic’s data can be replicated
> > between
> > > > > `Cluster-A` and `Cluster-B`, we need do make sure `Cluster-A` and
> > > > > `Cluster-B` sharing same configuration storage. There are two
> > > approaches
> > > > to
> > > > > do so:
> > > > >
> > > > > a) a completely separated zookeeper cluster as configuration
> storage.
> > > > >
> > > > > In this approach, everything is completely separated. So you can
> > treat
> > > > > these two clusters just as two different regions, and follow the
> > > > > instructions in [Pulsar geo-replication · Apache Pulsar](
> > > > > http://pulsar.apache.org/docs/en/administration-geo/) to setup
> data
> > > > > replication between these two clusters.
> > > > >
> > > > > b) `ClusterB` and `ClusterA` share same configuration storage.
> > > > >
> > > > > The approach in a) requires setting up a separate zookeeper cluster
> > as
> > > > > configuration storage. But since `ClusterA` and `ClusterB` already
> > have
> > > > > their own zookeeper clusters, you don’t want to setup another
> > zookeeper
> > > > > cluster. You can let both `ClusterA` and `ClusterB` use
> `ClusterA`’s
> > > > > zookeeper cluster as the configuration store. You can achieve it
> > using
> > > > > zookeeper’s chroot mechanism to put configuration data in a
> separate
> > > root
> > > > > in `ClusterA`’s zookeeper cluster.
> > > > >
> > > > > For example:
> > > > >
> > > > > - Command to initialize `ClusterA`’s metadata
> > > > >
> > > > > ```
> > > > > $ bin/pulsar initialize-cluster-metadata \
> > > > >   --cluster ClusterA \
> > > > >   --zookeeper zookeeper.cluster-a.example.com:2181 \
> > > > >   --configuration-store
> > > > > zookeeper.cluster-a.example.com:2181/configuration-store \
> > > > >   --web-service-url http://broker.cluster-a.example.com:8080/ \
> > > > >   --broker-service-url pulsar://broker.cluster-a.example.com:6650/
> > > > > ```
> > > > >
> > > > > - Command to initialize `ClusterB`’s metadata
> > > > > ```
> > > > > $ bin/pulsar initialize-cluster-metadata \
> > > > >   --cluster ClusterB \
> > > > >   --zookeeper zookeeper.cluster-b.example.com:2181 \
> > > > >   --configuration-store
> > > > > zookeeper.cluster-a.example.com:2181/configuration-store \
> > > > >   --web-service-url http://broker.cluster-b.example.com:8080/ \
> > > > >   --broker-service-url pulsar://broker.cluster-b.example.com:6650/
> > > > > ```
> > > > >
> > > > > #### Shared bookkeeper and zookeeper cluster, but separated brokers
> > > > >
> > > > > Sometimes it is unaffordable to have two completely separated
> > clusters.
> > > > You
> > > > > might want to share the existing infrastructures, such as data
> > storage
> > > > > (bookkeeper) and metadata storage (zookeeper). Similar as the b)
> > > solution
> > > > > described above, you can use zookeeper chroot to achieve that.
> > > > >
> > > > > Let’s assume there is only one zookeeper cluster and one bookkeeper
> > > > > cluster. The zookeeper cluster is `zookeeper.shared.example.com
> > :2181`.
> > > > > You have two clusters of brokers, one cluster of broker is `
> > > > > broker-a.example.com`, and the other broker cluster is `
> > > > > broker-b.example.com`.
> > > > > So when you create the clusters, you can use `
> > > > > zookeeper.shared.example.com:2181/configuration-store`
> <http://zookeeper.shared.example.com:2181/configuration-store>
> > <http://zookeeper.shared.example.com:2181/configuration-store>
> > > <http://zookeeper.shared.example.com:2181/configuration-store>
> > > > <http://zookeeper.shared.example.com:2181/configuration-store>
> > > > > <http://zookeeper.shared.example.com:2181/configuration-store> as
> > the
> > > > > shared
> > > > > configuration storage, and use `
> > > > > zookeeper.shared.example.com:2181/cluster-a`for
> <http://zookeeper.shared.example.com:2181/cluster-afor>
> > <http://zookeeper.shared.example.com:2181/cluster-afor>
> > > <http://zookeeper.shared.example.com:2181/cluster-afor>
> > > > <http://zookeeper.shared.example.com:2181/cluster-afor>
> > > > > <http://zookeeper.shared.example.com:2181/cluster-afor>
> `ClusterA`’s
> > > > > local metadata
> > > > > storage, and use `zookeeper.shared.example.com:2181/cluster-b`
> <http://zookeeper.shared.example.com:2181/cluster-b>
> > <http://zookeeper.shared.example.com:2181/cluster-b>
> > > <http://zookeeper.shared.example.com:2181/cluster-b>
> > > > <http://zookeeper.shared.example.com:2181/cluster-b>
> > > > > <http://zookeeper.shared.example.com:2181/cluster-b> for
> > > > > `ClusterB`’s local metadata storage.
> > > > >
> > > > > This would allows you have two “broker-separated” clusters sharing
> > same
> > > > > storage cluster (both zookeeper and bookkeeper).
> > > > >
> > > > > No matter how the physical clusters are setup, there is a downside
> of
> > > > using
> > > > > geo-replications for isolating the online workloads and analytics
> > > > workloads
> > > > > - data has to be replicated at least twice, if you have configured
> > > pulsar
> > > > > topics to store data in 3 replicas, you will end up have at least 6
> > > > copies
> > > > > of data. So “geo-replication” might not be ideal for addressing
> this
> > > use
> > > > > case.
> > > > >
> > > > > ------------
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Dezhi
> > > > >
> > > >
> > >
> >
>

Re: ReadOnly Topic Ownership Support

Posted by Jason O'Gray <ja...@theograys.com>.
I'm curious what the current work around is: 1) a partitioned topic to help
spread the load or 2) reading directly from the bookie?

Best.

On Thu, May 21, 2020, 2:29 PM Joe F <jo...@apache.org> wrote:

> Very useful feature.
>
> I would like the proposers  to think just beyond scaling consumers. If done
> right, this has the potential to open up a lot of use cases  in ML, where
> you need to reprocess old/archived  data. Being able to spin up a read-only
> broker, ( dedicated brokers that read from tiered storage, without
> interfering with current flow of the stream)  is extremely valuable. With
> small tweaks to this PIP, about data access boundaries,  and without lot of
> additional complexity to this proposal, that  can be achieved
>
> On Tue, May 12, 2020 at 5:37 AM Jia Zhai <zh...@gmail.com> wrote:
>
> > 👍
> >
> > Best Regards.
> >
> >
> > Jia Zhai
> >
> > Beijing, China
> >
> > Mobile: +86 15810491983
> >
> >
> >
> >
> > On Fri, May 8, 2020 at 4:29 AM Sijie Guo <gu...@gmail.com> wrote:
> >
> > > Dezhi, thank you for sharing the proposal!
> > >
> > > It is great to see Tencent started contributing this great feature back
> > to
> > > Pulsar! This feature will unlock a lot of new capabilities of Pulsar.
> > >
> > > I have moved the proposal to
> > >
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-63:-Readonly-Topic-Ownership-Support
> > >
> > > - Sijie
> > >
> > >
> > > On Thu, May 7, 2020 at 5:23 AM dezhi liu <li...@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > > Here is a suggest (PIP) ReadOnly Topic Ownership Support
> > > > ------------
> > > > # PIP-63: ReadOnly Topic Ownership Support
> > > >
> > > > * Author: Penghui LI, Jia Zhai, Sijie Guo, Dezhi Liu
> > > >
> > > > ## Motivation
> > > > People usually use Pulsar as an event-bus or event center to unify
> all
> > > > their message data or event data.
> > > > One same set of event data will usually be shared across multiple
> > > > applications. Problems occur when the number of subscriptions of same
> > > topic
> > > > increased.
> > > >
> > > > - The bandwidth of a broker limits the number of subscriptions for a
> > > single
> > > > topic.
> > > > - Subscriptions are competing the network bandwidth on brokers.
> > Different
> > > > subscription might have different level of severity.
> > > > - When synchronizing cross-city message reading, cross-city access
> > needs
> > > to
> > > > be minimized.
> > > >
> > > > This proposal is proposing adding readonly topic ownership support.
> If
> > > > Pulsar supports readonly ownership, users can then use it to setup a
> > > (few)
> > > > separated broker clusters for readonly, to segregate the consumption
> > > > traffic by their service severity. And this would also allow Pulsar
> > > > supporting large number of subscriptions.
> > > >
> > > > ## Changes
> > > > There are a few key changes for supporting readonly topic ownership.
> > > >
> > > > - how does readonly topic owner read data
> > > > - how does readonly topic owner keep metadata in-sync
> > > > - how does readonly topic owner handle acknowledges
> > > >
> > > > The first two problems have been well addressed in DistributedLog. We
> > can
> > > > just add similar features in managed ledger.
> > > >
> > > > ### How readonly topic owner read data
> > > >
> > > > In order for a readonly topic owner keep reading data in a streaming
> > way,
> > > > the managed ledger should be able to refresh its LAC.  The easiest
> > change
> > > > is to call `readLastAddConfirmedAsync` when a cursor requests entries
> > > > beyond existing LAC. A more advanced approach is to switch the
> regular
> > > read
> > > > entries request to bookkeeper’s long poll read requests. However long
> > > poll
> > > > read requests are not support in the bookkeeper v2 protocol.
> > > >
> > > > Required Changes:
> > > >
> > > > - Refresh LastAddConfirmed when a managed cursor requests entries
> > beyond
> > > > known LAC.
> > > > - Enable `explicitLac` at managed ledger. So the topic writable owner
> > > will
> > > > periodically advance LAC, which will make sure readonly owner will be
> > > able
> > > > to catch with the latest data.
> > > >
> > > > ### How readonly topic owner keep metadata in-sync
> > > >
> > > > Ledgers are rolled at a given interval. Readonly topic owner should
> > find
> > > a
> > > > way to know the ledgers has been rolled. There are a couple of
> options.
> > > > These options are categorized into two approaches : notification vs
> > > > polling.
> > > >
> > > > *Notification*
> > > >
> > > > A) use zookeeper watcher. Readonly topic owner will set a watcher at
> > the
> > > > managed ledger’s metadata. So it will be notified when a ledger is
> > > rolled.
> > > > B) similar as A), introduce a “notification” request between readonly
> > > topic
> > > > owner and writable topic owner. Writable topic owner notifies
> readonly
> > > > topic owner with metadata changes.
> > > >
> > > > *Polling*
> > > >
> > > > C) Readonly Broker polling zookeeper to see if there is new metadata,
> > > > *only* when LAC in the last ledger has not been advanced for a given
> > > > interval. Readonly Broker checks zookeeper to see if there is a new
> > > ledger
> > > > rolled.
> > > > D)Readonly Broker polling new metadata by read events from system
> topic
> > > of
> > > > write broker cluster, write broker add the ledger meta change events
> to
> > > the
> > > > system topic when mledger metadata update.
> > > >
> > > > Solution C) will be the simplest solution to start with
> > > >
> > > > ### How does readonly topic owner handle acknowledges
> > > >
> > > > Currently Pulsar deploys a centralized solution for managing cursors
> > and
> > > > use cursors for managing data retention. This PIP will not change
> this
> > > > solution. Instead, readonly topic owner will only maintains a cursor
> > > cache,
> > > > all the actual cursor updates will be sent back to the writable topic
> > > > owner.
> > > >
> > > > This requires introducing a set of “cursor” related RPCs between
> > writable
> > > > topic owner and readonly topic owners.
> > > >
> > > > - Read `Cursor` of a Subscription
> > > >
> > > > So readonly topic owner will handle following requests using these
> new
> > > > cursor RPCs
> > > >
> > > > - Subscribe : forward the subscribe request to writable topic owner.
> > Upon
> > > > successfully subscribe, readonly topic owner caches the corresponding
> > > > cursor.
> > > > - Unsubscribe: remove cursor from cursor cache, and forward the
> > > unsubscribe
> > > > request to writable topic owner.
> > > > - Consume: when a consumer is connected, it will then `read` the
> cursor
> > > > from writable topic owner and cache it locally.
> > > > - Ack: forward the ack request to the writable topic owner, and
> update
> > > the
> > > > cursor locally in the cache.
> > > >
> > > > ## Compatibility, Deprecation and Migration Plan
> > > > Since most of the changes are internally changes to managed ledger,
> and
> > > it
> > > > is a new feature which doesn’t change pulsar’s wire protocol and
> public
> > > > api. There is no backward compatibility  issue.
> > > >
> > > > It is a newly added feature. So there is nothing to deprecate or
> > migrate.
> > > >
> > > > ## Test Plan
> > > > - Unit tests for each individual change
> > > > - Integration tests for end-to-end pipeline
> > > > - Chaos testing to ensure correctness
> > > > - Load testing for ensuring performance
> > > >
> > > > ## Rejected Alternatives
> > > > ### Use Geo Replication to replicate data between clusters
> > > >
> > > > A simplest alternative solution would be using Pulsar’s built-in
> > > > geo-replication mechanism to replicate data from one cluster to the
> > other
> > > > cluster.
> > > >
> > > > #### Two completely separated clusters
> > > >
> > > > The idea is pretty straightforward - You created two separated
> > clusters,
> > > > one cluster is for your online services -  `Cluster-A`, while the
> other
> > > > cluster is for your analytical workloads - `Cluster-B`.  `ClusterA`
> is
> > > used
> > > > for serving both write (produce) and read (consume) traffic, while
> > > > `ClusterB` is used for serving readonly (consume) traffic. Both
> > > `Cluster-A`
> > > > and `Cluster-B` have their own zookeeper cluster, bookkeeper cluster,
> > and
> > > > brokers. In order to make sure a topic’s data can be replicated
> between
> > > > `Cluster-A` and `Cluster-B`, we need do make sure `Cluster-A` and
> > > > `Cluster-B` sharing same configuration storage. There are two
> > approaches
> > > to
> > > > do so:
> > > >
> > > > a) a completely separated zookeeper cluster as configuration storage.
> > > >
> > > > In this approach, everything is completely separated. So you can
> treat
> > > > these two clusters just as two different regions, and follow the
> > > > instructions in [Pulsar geo-replication · Apache Pulsar](
> > > > http://pulsar.apache.org/docs/en/administration-geo/) to setup data
> > > > replication between these two clusters.
> > > >
> > > > b) `ClusterB` and `ClusterA` share same configuration storage.
> > > >
> > > > The approach in a) requires setting up a separate zookeeper cluster
> as
> > > > configuration storage. But since `ClusterA` and `ClusterB` already
> have
> > > > their own zookeeper clusters, you don’t want to setup another
> zookeeper
> > > > cluster. You can let both `ClusterA` and `ClusterB` use `ClusterA`’s
> > > > zookeeper cluster as the configuration store. You can achieve it
> using
> > > > zookeeper’s chroot mechanism to put configuration data in a separate
> > root
> > > > in `ClusterA`’s zookeeper cluster.
> > > >
> > > > For example:
> > > >
> > > > - Command to initialize `ClusterA`’s metadata
> > > >
> > > > ```
> > > > $ bin/pulsar initialize-cluster-metadata \
> > > >   --cluster ClusterA \
> > > >   --zookeeper zookeeper.cluster-a.example.com:2181 \
> > > >   --configuration-store
> > > > zookeeper.cluster-a.example.com:2181/configuration-store \
> > > >   --web-service-url http://broker.cluster-a.example.com:8080/ \
> > > >   --broker-service-url pulsar://broker.cluster-a.example.com:6650/
> > > > ```
> > > >
> > > > - Command to initialize `ClusterB`’s metadata
> > > > ```
> > > > $ bin/pulsar initialize-cluster-metadata \
> > > >   --cluster ClusterB \
> > > >   --zookeeper zookeeper.cluster-b.example.com:2181 \
> > > >   --configuration-store
> > > > zookeeper.cluster-a.example.com:2181/configuration-store \
> > > >   --web-service-url http://broker.cluster-b.example.com:8080/ \
> > > >   --broker-service-url pulsar://broker.cluster-b.example.com:6650/
> > > > ```
> > > >
> > > > #### Shared bookkeeper and zookeeper cluster, but separated brokers
> > > >
> > > > Sometimes it is unaffordable to have two completely separated
> clusters.
> > > You
> > > > might want to share the existing infrastructures, such as data
> storage
> > > > (bookkeeper) and metadata storage (zookeeper). Similar as the b)
> > solution
> > > > described above, you can use zookeeper chroot to achieve that.
> > > >
> > > > Let’s assume there is only one zookeeper cluster and one bookkeeper
> > > > cluster. The zookeeper cluster is `zookeeper.shared.example.com
> :2181`.
> > > > You have two clusters of brokers, one cluster of broker is `
> > > > broker-a.example.com`, and the other broker cluster is `
> > > > broker-b.example.com`.
> > > > So when you create the clusters, you can use `
> > > > zookeeper.shared.example.com:2181/configuration-store`
> <http://zookeeper.shared.example.com:2181/configuration-store>
> > <http://zookeeper.shared.example.com:2181/configuration-store>
> > > <http://zookeeper.shared.example.com:2181/configuration-store>
> > > > <http://zookeeper.shared.example.com:2181/configuration-store> as
> the
> > > > shared
> > > > configuration storage, and use `
> > > > zookeeper.shared.example.com:2181/cluster-a`for
> <http://zookeeper.shared.example.com:2181/cluster-afor>
> > <http://zookeeper.shared.example.com:2181/cluster-afor>
> > > <http://zookeeper.shared.example.com:2181/cluster-afor>
> > > > <http://zookeeper.shared.example.com:2181/cluster-afor> `ClusterA`’s
> > > > local metadata
> > > > storage, and use `zookeeper.shared.example.com:2181/cluster-b`
> <http://zookeeper.shared.example.com:2181/cluster-b>
> > <http://zookeeper.shared.example.com:2181/cluster-b>
> > > <http://zookeeper.shared.example.com:2181/cluster-b>
> > > > <http://zookeeper.shared.example.com:2181/cluster-b> for
> > > > `ClusterB`’s local metadata storage.
> > > >
> > > > This would allows you have two “broker-separated” clusters sharing
> same
> > > > storage cluster (both zookeeper and bookkeeper).
> > > >
> > > > No matter how the physical clusters are setup, there is a downside of
> > > using
> > > > geo-replications for isolating the online workloads and analytics
> > > workloads
> > > > - data has to be replicated at least twice, if you have configured
> > pulsar
> > > > topics to store data in 3 replicas, you will end up have at least 6
> > > copies
> > > > of data. So “geo-replication” might not be ideal for addressing this
> > use
> > > > case.
> > > >
> > > > ------------
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Dezhi
> > > >
> > >
> >
>

Re: ReadOnly Topic Ownership Support

Posted by Joe F <jo...@apache.org>.
Very useful feature.

I would like the proposers  to think just beyond scaling consumers. If done
right, this has the potential to open up a lot of use cases  in ML, where
you need to reprocess old/archived  data. Being able to spin up a read-only
broker, ( dedicated brokers that read from tiered storage, without
interfering with current flow of the stream)  is extremely valuable. With
small tweaks to this PIP, about data access boundaries,  and without lot of
additional complexity to this proposal, that  can be achieved

On Tue, May 12, 2020 at 5:37 AM Jia Zhai <zh...@gmail.com> wrote:

> 👍
>
> Best Regards.
>
>
> Jia Zhai
>
> Beijing, China
>
> Mobile: +86 15810491983
>
>
>
>
> On Fri, May 8, 2020 at 4:29 AM Sijie Guo <gu...@gmail.com> wrote:
>
> > Dezhi, thank you for sharing the proposal!
> >
> > It is great to see Tencent started contributing this great feature back
> to
> > Pulsar! This feature will unlock a lot of new capabilities of Pulsar.
> >
> > I have moved the proposal to
> >
> >
> https://github.com/apache/pulsar/wiki/PIP-63:-Readonly-Topic-Ownership-Support
> >
> > - Sijie
> >
> >
> > On Thu, May 7, 2020 at 5:23 AM dezhi liu <li...@gmail.com> wrote:
> >
> > > Hi all,
> > > Here is a suggest (PIP) ReadOnly Topic Ownership Support
> > > ------------
> > > # PIP-63: ReadOnly Topic Ownership Support
> > >
> > > * Author: Penghui LI, Jia Zhai, Sijie Guo, Dezhi Liu
> > >
> > > ## Motivation
> > > People usually use Pulsar as an event-bus or event center to unify all
> > > their message data or event data.
> > > One same set of event data will usually be shared across multiple
> > > applications. Problems occur when the number of subscriptions of same
> > topic
> > > increased.
> > >
> > > - The bandwidth of a broker limits the number of subscriptions for a
> > single
> > > topic.
> > > - Subscriptions are competing the network bandwidth on brokers.
> Different
> > > subscription might have different level of severity.
> > > - When synchronizing cross-city message reading, cross-city access
> needs
> > to
> > > be minimized.
> > >
> > > This proposal is proposing adding readonly topic ownership support. If
> > > Pulsar supports readonly ownership, users can then use it to setup a
> > (few)
> > > separated broker clusters for readonly, to segregate the consumption
> > > traffic by their service severity. And this would also allow Pulsar
> > > supporting large number of subscriptions.
> > >
> > > ## Changes
> > > There are a few key changes for supporting readonly topic ownership.
> > >
> > > - how does readonly topic owner read data
> > > - how does readonly topic owner keep metadata in-sync
> > > - how does readonly topic owner handle acknowledges
> > >
> > > The first two problems have been well addressed in DistributedLog. We
> can
> > > just add similar features in managed ledger.
> > >
> > > ### How readonly topic owner read data
> > >
> > > In order for a readonly topic owner keep reading data in a streaming
> way,
> > > the managed ledger should be able to refresh its LAC.  The easiest
> change
> > > is to call `readLastAddConfirmedAsync` when a cursor requests entries
> > > beyond existing LAC. A more advanced approach is to switch the regular
> > read
> > > entries request to bookkeeper’s long poll read requests. However long
> > poll
> > > read requests are not support in the bookkeeper v2 protocol.
> > >
> > > Required Changes:
> > >
> > > - Refresh LastAddConfirmed when a managed cursor requests entries
> beyond
> > > known LAC.
> > > - Enable `explicitLac` at managed ledger. So the topic writable owner
> > will
> > > periodically advance LAC, which will make sure readonly owner will be
> > able
> > > to catch with the latest data.
> > >
> > > ### How readonly topic owner keep metadata in-sync
> > >
> > > Ledgers are rolled at a given interval. Readonly topic owner should
> find
> > a
> > > way to know the ledgers has been rolled. There are a couple of options.
> > > These options are categorized into two approaches : notification vs
> > > polling.
> > >
> > > *Notification*
> > >
> > > A) use zookeeper watcher. Readonly topic owner will set a watcher at
> the
> > > managed ledger’s metadata. So it will be notified when a ledger is
> > rolled.
> > > B) similar as A), introduce a “notification” request between readonly
> > topic
> > > owner and writable topic owner. Writable topic owner notifies readonly
> > > topic owner with metadata changes.
> > >
> > > *Polling*
> > >
> > > C) Readonly Broker polling zookeeper to see if there is new metadata,
> > > *only* when LAC in the last ledger has not been advanced for a given
> > > interval. Readonly Broker checks zookeeper to see if there is a new
> > ledger
> > > rolled.
> > > D)Readonly Broker polling new metadata by read events from system topic
> > of
> > > write broker cluster, write broker add the ledger meta change events to
> > the
> > > system topic when mledger metadata update.
> > >
> > > Solution C) will be the simplest solution to start with
> > >
> > > ### How does readonly topic owner handle acknowledges
> > >
> > > Currently Pulsar deploys a centralized solution for managing cursors
> and
> > > use cursors for managing data retention. This PIP will not change this
> > > solution. Instead, readonly topic owner will only maintains a cursor
> > cache,
> > > all the actual cursor updates will be sent back to the writable topic
> > > owner.
> > >
> > > This requires introducing a set of “cursor” related RPCs between
> writable
> > > topic owner and readonly topic owners.
> > >
> > > - Read `Cursor` of a Subscription
> > >
> > > So readonly topic owner will handle following requests using these new
> > > cursor RPCs
> > >
> > > - Subscribe : forward the subscribe request to writable topic owner.
> Upon
> > > successfully subscribe, readonly topic owner caches the corresponding
> > > cursor.
> > > - Unsubscribe: remove cursor from cursor cache, and forward the
> > unsubscribe
> > > request to writable topic owner.
> > > - Consume: when a consumer is connected, it will then `read` the cursor
> > > from writable topic owner and cache it locally.
> > > - Ack: forward the ack request to the writable topic owner, and update
> > the
> > > cursor locally in the cache.
> > >
> > > ## Compatibility, Deprecation and Migration Plan
> > > Since most of the changes are internally changes to managed ledger, and
> > it
> > > is a new feature which doesn’t change pulsar’s wire protocol and public
> > > api. There is no backward compatibility  issue.
> > >
> > > It is a newly added feature. So there is nothing to deprecate or
> migrate.
> > >
> > > ## Test Plan
> > > - Unit tests for each individual change
> > > - Integration tests for end-to-end pipeline
> > > - Chaos testing to ensure correctness
> > > - Load testing for ensuring performance
> > >
> > > ## Rejected Alternatives
> > > ### Use Geo Replication to replicate data between clusters
> > >
> > > A simplest alternative solution would be using Pulsar’s built-in
> > > geo-replication mechanism to replicate data from one cluster to the
> other
> > > cluster.
> > >
> > > #### Two completely separated clusters
> > >
> > > The idea is pretty straightforward - You created two separated
> clusters,
> > > one cluster is for your online services -  `Cluster-A`, while the other
> > > cluster is for your analytical workloads - `Cluster-B`.  `ClusterA` is
> > used
> > > for serving both write (produce) and read (consume) traffic, while
> > > `ClusterB` is used for serving readonly (consume) traffic. Both
> > `Cluster-A`
> > > and `Cluster-B` have their own zookeeper cluster, bookkeeper cluster,
> and
> > > brokers. In order to make sure a topic’s data can be replicated between
> > > `Cluster-A` and `Cluster-B`, we need do make sure `Cluster-A` and
> > > `Cluster-B` sharing same configuration storage. There are two
> approaches
> > to
> > > do so:
> > >
> > > a) a completely separated zookeeper cluster as configuration storage.
> > >
> > > In this approach, everything is completely separated. So you can treat
> > > these two clusters just as two different regions, and follow the
> > > instructions in [Pulsar geo-replication · Apache Pulsar](
> > > http://pulsar.apache.org/docs/en/administration-geo/) to setup data
> > > replication between these two clusters.
> > >
> > > b) `ClusterB` and `ClusterA` share same configuration storage.
> > >
> > > The approach in a) requires setting up a separate zookeeper cluster as
> > > configuration storage. But since `ClusterA` and `ClusterB` already have
> > > their own zookeeper clusters, you don’t want to setup another zookeeper
> > > cluster. You can let both `ClusterA` and `ClusterB` use `ClusterA`’s
> > > zookeeper cluster as the configuration store. You can achieve it using
> > > zookeeper’s chroot mechanism to put configuration data in a separate
> root
> > > in `ClusterA`’s zookeeper cluster.
> > >
> > > For example:
> > >
> > > - Command to initialize `ClusterA`’s metadata
> > >
> > > ```
> > > $ bin/pulsar initialize-cluster-metadata \
> > >   --cluster ClusterA \
> > >   --zookeeper zookeeper.cluster-a.example.com:2181 \
> > >   --configuration-store
> > > zookeeper.cluster-a.example.com:2181/configuration-store \
> > >   --web-service-url http://broker.cluster-a.example.com:8080/ \
> > >   --broker-service-url pulsar://broker.cluster-a.example.com:6650/
> > > ```
> > >
> > > - Command to initialize `ClusterB`’s metadata
> > > ```
> > > $ bin/pulsar initialize-cluster-metadata \
> > >   --cluster ClusterB \
> > >   --zookeeper zookeeper.cluster-b.example.com:2181 \
> > >   --configuration-store
> > > zookeeper.cluster-a.example.com:2181/configuration-store \
> > >   --web-service-url http://broker.cluster-b.example.com:8080/ \
> > >   --broker-service-url pulsar://broker.cluster-b.example.com:6650/
> > > ```
> > >
> > > #### Shared bookkeeper and zookeeper cluster, but separated brokers
> > >
> > > Sometimes it is unaffordable to have two completely separated clusters.
> > You
> > > might want to share the existing infrastructures, such as data storage
> > > (bookkeeper) and metadata storage (zookeeper). Similar as the b)
> solution
> > > described above, you can use zookeeper chroot to achieve that.
> > >
> > > Let’s assume there is only one zookeeper cluster and one bookkeeper
> > > cluster. The zookeeper cluster is `zookeeper.shared.example.com:2181`.
> > > You have two clusters of brokers, one cluster of broker is `
> > > broker-a.example.com`, and the other broker cluster is `
> > > broker-b.example.com`.
> > > So when you create the clusters, you can use `
> > > zookeeper.shared.example.com:2181/configuration-store`
> <http://zookeeper.shared.example.com:2181/configuration-store>
> > <http://zookeeper.shared.example.com:2181/configuration-store>
> > > <http://zookeeper.shared.example.com:2181/configuration-store> as the
> > > shared
> > > configuration storage, and use `
> > > zookeeper.shared.example.com:2181/cluster-a`for
> <http://zookeeper.shared.example.com:2181/cluster-afor>
> > <http://zookeeper.shared.example.com:2181/cluster-afor>
> > > <http://zookeeper.shared.example.com:2181/cluster-afor> `ClusterA`’s
> > > local metadata
> > > storage, and use `zookeeper.shared.example.com:2181/cluster-b`
> <http://zookeeper.shared.example.com:2181/cluster-b>
> > <http://zookeeper.shared.example.com:2181/cluster-b>
> > > <http://zookeeper.shared.example.com:2181/cluster-b> for
> > > `ClusterB`’s local metadata storage.
> > >
> > > This would allows you have two “broker-separated” clusters sharing same
> > > storage cluster (both zookeeper and bookkeeper).
> > >
> > > No matter how the physical clusters are setup, there is a downside of
> > using
> > > geo-replications for isolating the online workloads and analytics
> > workloads
> > > - data has to be replicated at least twice, if you have configured
> pulsar
> > > topics to store data in 3 replicas, you will end up have at least 6
> > copies
> > > of data. So “geo-replication” might not be ideal for addressing this
> use
> > > case.
> > >
> > > ------------
> > >
> > >
> > > Thanks,
> > >
> > > Dezhi
> > >
> >
>

Re: ReadOnly Topic Ownership Support

Posted by Jia Zhai <zh...@gmail.com>.
👍

Best Regards.


Jia Zhai

Beijing, China

Mobile: +86 15810491983




On Fri, May 8, 2020 at 4:29 AM Sijie Guo <gu...@gmail.com> wrote:

> Dezhi, thank you for sharing the proposal!
>
> It is great to see Tencent started contributing this great feature back to
> Pulsar! This feature will unlock a lot of new capabilities of Pulsar.
>
> I have moved the proposal to
>
> https://github.com/apache/pulsar/wiki/PIP-63:-Readonly-Topic-Ownership-Support
>
> - Sijie
>
>
> On Thu, May 7, 2020 at 5:23 AM dezhi liu <li...@gmail.com> wrote:
>
> > Hi all,
> > Here is a suggest (PIP) ReadOnly Topic Ownership Support
> > ------------
> > # PIP-63: ReadOnly Topic Ownership Support
> >
> > * Author: Penghui LI, Jia Zhai, Sijie Guo, Dezhi Liu
> >
> > ## Motivation
> > People usually use Pulsar as an event-bus or event center to unify all
> > their message data or event data.
> > One same set of event data will usually be shared across multiple
> > applications. Problems occur when the number of subscriptions of same
> topic
> > increased.
> >
> > - The bandwidth of a broker limits the number of subscriptions for a
> single
> > topic.
> > - Subscriptions are competing the network bandwidth on brokers. Different
> > subscription might have different level of severity.
> > - When synchronizing cross-city message reading, cross-city access needs
> to
> > be minimized.
> >
> > This proposal is proposing adding readonly topic ownership support. If
> > Pulsar supports readonly ownership, users can then use it to setup a
> (few)
> > separated broker clusters for readonly, to segregate the consumption
> > traffic by their service severity. And this would also allow Pulsar
> > supporting large number of subscriptions.
> >
> > ## Changes
> > There are a few key changes for supporting readonly topic ownership.
> >
> > - how does readonly topic owner read data
> > - how does readonly topic owner keep metadata in-sync
> > - how does readonly topic owner handle acknowledges
> >
> > The first two problems have been well addressed in DistributedLog. We can
> > just add similar features in managed ledger.
> >
> > ### How readonly topic owner read data
> >
> > In order for a readonly topic owner keep reading data in a streaming way,
> > the managed ledger should be able to refresh its LAC.  The easiest change
> > is to call `readLastAddConfirmedAsync` when a cursor requests entries
> > beyond existing LAC. A more advanced approach is to switch the regular
> read
> > entries request to bookkeeper’s long poll read requests. However long
> poll
> > read requests are not support in the bookkeeper v2 protocol.
> >
> > Required Changes:
> >
> > - Refresh LastAddConfirmed when a managed cursor requests entries beyond
> > known LAC.
> > - Enable `explicitLac` at managed ledger. So the topic writable owner
> will
> > periodically advance LAC, which will make sure readonly owner will be
> able
> > to catch with the latest data.
> >
> > ### How readonly topic owner keep metadata in-sync
> >
> > Ledgers are rolled at a given interval. Readonly topic owner should find
> a
> > way to know the ledgers has been rolled. There are a couple of options.
> > These options are categorized into two approaches : notification vs
> > polling.
> >
> > *Notification*
> >
> > A) use zookeeper watcher. Readonly topic owner will set a watcher at the
> > managed ledger’s metadata. So it will be notified when a ledger is
> rolled.
> > B) similar as A), introduce a “notification” request between readonly
> topic
> > owner and writable topic owner. Writable topic owner notifies readonly
> > topic owner with metadata changes.
> >
> > *Polling*
> >
> > C) Readonly Broker polling zookeeper to see if there is new metadata,
> > *only* when LAC in the last ledger has not been advanced for a given
> > interval. Readonly Broker checks zookeeper to see if there is a new
> ledger
> > rolled.
> > D)Readonly Broker polling new metadata by read events from system topic
> of
> > write broker cluster, write broker add the ledger meta change events to
> the
> > system topic when mledger metadata update.
> >
> > Solution C) will be the simplest solution to start with
> >
> > ### How does readonly topic owner handle acknowledges
> >
> > Currently Pulsar deploys a centralized solution for managing cursors and
> > use cursors for managing data retention. This PIP will not change this
> > solution. Instead, readonly topic owner will only maintains a cursor
> cache,
> > all the actual cursor updates will be sent back to the writable topic
> > owner.
> >
> > This requires introducing a set of “cursor” related RPCs between writable
> > topic owner and readonly topic owners.
> >
> > - Read `Cursor` of a Subscription
> >
> > So readonly topic owner will handle following requests using these new
> > cursor RPCs
> >
> > - Subscribe : forward the subscribe request to writable topic owner. Upon
> > successfully subscribe, readonly topic owner caches the corresponding
> > cursor.
> > - Unsubscribe: remove cursor from cursor cache, and forward the
> unsubscribe
> > request to writable topic owner.
> > - Consume: when a consumer is connected, it will then `read` the cursor
> > from writable topic owner and cache it locally.
> > - Ack: forward the ack request to the writable topic owner, and update
> the
> > cursor locally in the cache.
> >
> > ## Compatibility, Deprecation and Migration Plan
> > Since most of the changes are internally changes to managed ledger, and
> it
> > is a new feature which doesn’t change pulsar’s wire protocol and public
> > api. There is no backward compatibility  issue.
> >
> > It is a newly added feature. So there is nothing to deprecate or migrate.
> >
> > ## Test Plan
> > - Unit tests for each individual change
> > - Integration tests for end-to-end pipeline
> > - Chaos testing to ensure correctness
> > - Load testing for ensuring performance
> >
> > ## Rejected Alternatives
> > ### Use Geo Replication to replicate data between clusters
> >
> > A simplest alternative solution would be using Pulsar’s built-in
> > geo-replication mechanism to replicate data from one cluster to the other
> > cluster.
> >
> > #### Two completely separated clusters
> >
> > The idea is pretty straightforward - You created two separated clusters,
> > one cluster is for your online services -  `Cluster-A`, while the other
> > cluster is for your analytical workloads - `Cluster-B`.  `ClusterA` is
> used
> > for serving both write (produce) and read (consume) traffic, while
> > `ClusterB` is used for serving readonly (consume) traffic. Both
> `Cluster-A`
> > and `Cluster-B` have their own zookeeper cluster, bookkeeper cluster, and
> > brokers. In order to make sure a topic’s data can be replicated between
> > `Cluster-A` and `Cluster-B`, we need do make sure `Cluster-A` and
> > `Cluster-B` sharing same configuration storage. There are two approaches
> to
> > do so:
> >
> > a) a completely separated zookeeper cluster as configuration storage.
> >
> > In this approach, everything is completely separated. So you can treat
> > these two clusters just as two different regions, and follow the
> > instructions in [Pulsar geo-replication · Apache Pulsar](
> > http://pulsar.apache.org/docs/en/administration-geo/) to setup data
> > replication between these two clusters.
> >
> > b) `ClusterB` and `ClusterA` share same configuration storage.
> >
> > The approach in a) requires setting up a separate zookeeper cluster as
> > configuration storage. But since `ClusterA` and `ClusterB` already have
> > their own zookeeper clusters, you don’t want to setup another zookeeper
> > cluster. You can let both `ClusterA` and `ClusterB` use `ClusterA`’s
> > zookeeper cluster as the configuration store. You can achieve it using
> > zookeeper’s chroot mechanism to put configuration data in a separate root
> > in `ClusterA`’s zookeeper cluster.
> >
> > For example:
> >
> > - Command to initialize `ClusterA`’s metadata
> >
> > ```
> > $ bin/pulsar initialize-cluster-metadata \
> >   --cluster ClusterA \
> >   --zookeeper zookeeper.cluster-a.example.com:2181 \
> >   --configuration-store
> > zookeeper.cluster-a.example.com:2181/configuration-store \
> >   --web-service-url http://broker.cluster-a.example.com:8080/ \
> >   --broker-service-url pulsar://broker.cluster-a.example.com:6650/
> > ```
> >
> > - Command to initialize `ClusterB`’s metadata
> > ```
> > $ bin/pulsar initialize-cluster-metadata \
> >   --cluster ClusterB \
> >   --zookeeper zookeeper.cluster-b.example.com:2181 \
> >   --configuration-store
> > zookeeper.cluster-a.example.com:2181/configuration-store \
> >   --web-service-url http://broker.cluster-b.example.com:8080/ \
> >   --broker-service-url pulsar://broker.cluster-b.example.com:6650/
> > ```
> >
> > #### Shared bookkeeper and zookeeper cluster, but separated brokers
> >
> > Sometimes it is unaffordable to have two completely separated clusters.
> You
> > might want to share the existing infrastructures, such as data storage
> > (bookkeeper) and metadata storage (zookeeper). Similar as the b) solution
> > described above, you can use zookeeper chroot to achieve that.
> >
> > Let’s assume there is only one zookeeper cluster and one bookkeeper
> > cluster. The zookeeper cluster is `zookeeper.shared.example.com:2181`.
> > You have two clusters of brokers, one cluster of broker is `
> > broker-a.example.com`, and the other broker cluster is `
> > broker-b.example.com`.
> > So when you create the clusters, you can use `
> > zookeeper.shared.example.com:2181/configuration-store`
> <http://zookeeper.shared.example.com:2181/configuration-store>
> > <http://zookeeper.shared.example.com:2181/configuration-store> as the
> > shared
> > configuration storage, and use `
> > zookeeper.shared.example.com:2181/cluster-a`for
> <http://zookeeper.shared.example.com:2181/cluster-afor>
> > <http://zookeeper.shared.example.com:2181/cluster-afor> `ClusterA`’s
> > local metadata
> > storage, and use `zookeeper.shared.example.com:2181/cluster-b`
> <http://zookeeper.shared.example.com:2181/cluster-b>
> > <http://zookeeper.shared.example.com:2181/cluster-b> for
> > `ClusterB`’s local metadata storage.
> >
> > This would allows you have two “broker-separated” clusters sharing same
> > storage cluster (both zookeeper and bookkeeper).
> >
> > No matter how the physical clusters are setup, there is a downside of
> using
> > geo-replications for isolating the online workloads and analytics
> workloads
> > - data has to be replicated at least twice, if you have configured pulsar
> > topics to store data in 3 replicas, you will end up have at least 6
> copies
> > of data. So “geo-replication” might not be ideal for addressing this use
> > case.
> >
> > ------------
> >
> >
> > Thanks,
> >
> > Dezhi
> >
>

Re: ReadOnly Topic Ownership Support

Posted by Sijie Guo <gu...@gmail.com>.
Dezhi, thank you for sharing the proposal!

It is great to see Tencent started contributing this great feature back to
Pulsar! This feature will unlock a lot of new capabilities of Pulsar.

I have moved the proposal to
https://github.com/apache/pulsar/wiki/PIP-63:-Readonly-Topic-Ownership-Support

- Sijie


On Thu, May 7, 2020 at 5:23 AM dezhi liu <li...@gmail.com> wrote:

> Hi all,
> Here is a suggest (PIP) ReadOnly Topic Ownership Support
> ------------
> # PIP-63: ReadOnly Topic Ownership Support
>
> * Author: Penghui LI, Jia Zhai, Sijie Guo, Dezhi Liu
>
> ## Motivation
> People usually use Pulsar as an event-bus or event center to unify all
> their message data or event data.
> One same set of event data will usually be shared across multiple
> applications. Problems occur when the number of subscriptions of same topic
> increased.
>
> - The bandwidth of a broker limits the number of subscriptions for a single
> topic.
> - Subscriptions are competing the network bandwidth on brokers. Different
> subscription might have different level of severity.
> - When synchronizing cross-city message reading, cross-city access needs to
> be minimized.
>
> This proposal is proposing adding readonly topic ownership support. If
> Pulsar supports readonly ownership, users can then use it to setup a (few)
> separated broker clusters for readonly, to segregate the consumption
> traffic by their service severity. And this would also allow Pulsar
> supporting large number of subscriptions.
>
> ## Changes
> There are a few key changes for supporting readonly topic ownership.
>
> - how does readonly topic owner read data
> - how does readonly topic owner keep metadata in-sync
> - how does readonly topic owner handle acknowledges
>
> The first two problems have been well addressed in DistributedLog. We can
> just add similar features in managed ledger.
>
> ### How readonly topic owner read data
>
> In order for a readonly topic owner keep reading data in a streaming way,
> the managed ledger should be able to refresh its LAC.  The easiest change
> is to call `readLastAddConfirmedAsync` when a cursor requests entries
> beyond existing LAC. A more advanced approach is to switch the regular read
> entries request to bookkeeper’s long poll read requests. However long poll
> read requests are not support in the bookkeeper v2 protocol.
>
> Required Changes:
>
> - Refresh LastAddConfirmed when a managed cursor requests entries beyond
> known LAC.
> - Enable `explicitLac` at managed ledger. So the topic writable owner will
> periodically advance LAC, which will make sure readonly owner will be able
> to catch with the latest data.
>
> ### How readonly topic owner keep metadata in-sync
>
> Ledgers are rolled at a given interval. Readonly topic owner should find a
> way to know the ledgers has been rolled. There are a couple of options.
> These options are categorized into two approaches : notification vs
> polling.
>
> *Notification*
>
> A) use zookeeper watcher. Readonly topic owner will set a watcher at the
> managed ledger’s metadata. So it will be notified when a ledger is rolled.
> B) similar as A), introduce a “notification” request between readonly topic
> owner and writable topic owner. Writable topic owner notifies readonly
> topic owner with metadata changes.
>
> *Polling*
>
> C) Readonly Broker polling zookeeper to see if there is new metadata,
> *only* when LAC in the last ledger has not been advanced for a given
> interval. Readonly Broker checks zookeeper to see if there is a new ledger
> rolled.
> D)Readonly Broker polling new metadata by read events from system topic of
> write broker cluster, write broker add the ledger meta change events to the
> system topic when mledger metadata update.
>
> Solution C) will be the simplest solution to start with
>
> ### How does readonly topic owner handle acknowledges
>
> Currently Pulsar deploys a centralized solution for managing cursors and
> use cursors for managing data retention. This PIP will not change this
> solution. Instead, readonly topic owner will only maintains a cursor cache,
> all the actual cursor updates will be sent back to the writable topic
> owner.
>
> This requires introducing a set of “cursor” related RPCs between writable
> topic owner and readonly topic owners.
>
> - Read `Cursor` of a Subscription
>
> So readonly topic owner will handle following requests using these new
> cursor RPCs
>
> - Subscribe : forward the subscribe request to writable topic owner. Upon
> successfully subscribe, readonly topic owner caches the corresponding
> cursor.
> - Unsubscribe: remove cursor from cursor cache, and forward the unsubscribe
> request to writable topic owner.
> - Consume: when a consumer is connected, it will then `read` the cursor
> from writable topic owner and cache it locally.
> - Ack: forward the ack request to the writable topic owner, and update the
> cursor locally in the cache.
>
> ## Compatibility, Deprecation and Migration Plan
> Since most of the changes are internally changes to managed ledger, and it
> is a new feature which doesn’t change pulsar’s wire protocol and public
> api. There is no backward compatibility  issue.
>
> It is a newly added feature. So there is nothing to deprecate or migrate.
>
> ## Test Plan
> - Unit tests for each individual change
> - Integration tests for end-to-end pipeline
> - Chaos testing to ensure correctness
> - Load testing for ensuring performance
>
> ## Rejected Alternatives
> ### Use Geo Replication to replicate data between clusters
>
> A simplest alternative solution would be using Pulsar’s built-in
> geo-replication mechanism to replicate data from one cluster to the other
> cluster.
>
> #### Two completely separated clusters
>
> The idea is pretty straightforward - You created two separated clusters,
> one cluster is for your online services -  `Cluster-A`, while the other
> cluster is for your analytical workloads - `Cluster-B`.  `ClusterA` is used
> for serving both write (produce) and read (consume) traffic, while
> `ClusterB` is used for serving readonly (consume) traffic. Both `Cluster-A`
> and `Cluster-B` have their own zookeeper cluster, bookkeeper cluster, and
> brokers. In order to make sure a topic’s data can be replicated between
> `Cluster-A` and `Cluster-B`, we need do make sure `Cluster-A` and
> `Cluster-B` sharing same configuration storage. There are two approaches to
> do so:
>
> a) a completely separated zookeeper cluster as configuration storage.
>
> In this approach, everything is completely separated. So you can treat
> these two clusters just as two different regions, and follow the
> instructions in [Pulsar geo-replication · Apache Pulsar](
> http://pulsar.apache.org/docs/en/administration-geo/) to setup data
> replication between these two clusters.
>
> b) `ClusterB` and `ClusterA` share same configuration storage.
>
> The approach in a) requires setting up a separate zookeeper cluster as
> configuration storage. But since `ClusterA` and `ClusterB` already have
> their own zookeeper clusters, you don’t want to setup another zookeeper
> cluster. You can let both `ClusterA` and `ClusterB` use `ClusterA`’s
> zookeeper cluster as the configuration store. You can achieve it using
> zookeeper’s chroot mechanism to put configuration data in a separate root
> in `ClusterA`’s zookeeper cluster.
>
> For example:
>
> - Command to initialize `ClusterA`’s metadata
>
> ```
> $ bin/pulsar initialize-cluster-metadata \
>   --cluster ClusterA \
>   --zookeeper zookeeper.cluster-a.example.com:2181 \
>   --configuration-store
> zookeeper.cluster-a.example.com:2181/configuration-store \
>   --web-service-url http://broker.cluster-a.example.com:8080/ \
>   --broker-service-url pulsar://broker.cluster-a.example.com:6650/
> ```
>
> - Command to initialize `ClusterB`’s metadata
> ```
> $ bin/pulsar initialize-cluster-metadata \
>   --cluster ClusterB \
>   --zookeeper zookeeper.cluster-b.example.com:2181 \
>   --configuration-store
> zookeeper.cluster-a.example.com:2181/configuration-store \
>   --web-service-url http://broker.cluster-b.example.com:8080/ \
>   --broker-service-url pulsar://broker.cluster-b.example.com:6650/
> ```
>
> #### Shared bookkeeper and zookeeper cluster, but separated brokers
>
> Sometimes it is unaffordable to have two completely separated clusters. You
> might want to share the existing infrastructures, such as data storage
> (bookkeeper) and metadata storage (zookeeper). Similar as the b) solution
> described above, you can use zookeeper chroot to achieve that.
>
> Let’s assume there is only one zookeeper cluster and one bookkeeper
> cluster. The zookeeper cluster is `zookeeper.shared.example.com:2181`.
> You have two clusters of brokers, one cluster of broker is `
> broker-a.example.com`, and the other broker cluster is `
> broker-b.example.com`.
> So when you create the clusters, you can use `
> zookeeper.shared.example.com:2181/configuration-store`
> <http://zookeeper.shared.example.com:2181/configuration-store> as the
> shared
> configuration storage, and use `
> zookeeper.shared.example.com:2181/cluster-a`for
> <http://zookeeper.shared.example.com:2181/cluster-afor> `ClusterA`’s
> local metadata
> storage, and use `zookeeper.shared.example.com:2181/cluster-b`
> <http://zookeeper.shared.example.com:2181/cluster-b> for
> `ClusterB`’s local metadata storage.
>
> This would allows you have two “broker-separated” clusters sharing same
> storage cluster (both zookeeper and bookkeeper).
>
> No matter how the physical clusters are setup, there is a downside of using
> geo-replications for isolating the online workloads and analytics workloads
> - data has to be replicated at least twice, if you have configured pulsar
> topics to store data in 3 replicas, you will end up have at least 6 copies
> of data. So “geo-replication” might not be ideal for addressing this use
> case.
>
> ------------
>
>
> Thanks,
>
> Dezhi
>