You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Haiting Jiang <ji...@apache.org> on 2022/06/21 08:00:30 UTC

[DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Hi Pulsar community:

I open a pip to discuss "Shadow Topic, an alternative way to support readonly topic ownership."

Proposal Link: https://github.com/apache/pulsar/issues/16153

---

## Motivation

The motivation is the same as PIP-63[1], with a new broadcast use case of
supporting 100K subscriptions in a single topic.
1. The bandwidth of a broker limits the number of subscriptions for a single
   topic.
2. Subscriptions are competing for the network bandwidth on brokers. Different
   subscriptions might have different levels of severity.
3. When synchronizing cross-city message reading, cross-city access needs to
   be minimized.
4. [New] Broadcast with 100K subscriptions. There is a limitation of the
   subscription number of a single topic. It's tested by Hongjie from NTT Lab
   that with 40K subscriptions in a single topic, the client needs about 20min
   to start all client connections, and under 1 msg/s message producer rate,
   the average end to end latency is about 2.9s. And for 100K subscriptions,
   the time of start connection and E2E latency is beyond consideration.

However, it's too complicated to implement with original PIP-63 proposal, the
changed code is already over 3K+ lines, see PR#11960[2], and there are still
some problems left,
1. The LAC in readonly topic is updated in a polling pattern, which increases
   the bookie load bookie.
2. The message data of readonly topic won't be cached in broker. Increase the
   network usage between broker and bookie when there are more than one
   subscriber is tail-reading.
3. All the subscriptions is managed in original writable-topic, so the support
   max subscription number is not scaleable.

This PIP tries to come up with a simpler solution to support readonly topic
ownership and solve the problems the previous PR left. The main idea of this
solution is to reuse the feature of geo-replication, but instead of
duplicating storage, it shares underlying bookie ledgers between different
topics.

## Goal

The goal is to introduce **Shadow Topic** as a new type of topic to support
readonly topic ownership. Just as its name implies, a shadow topic is the
shadow of some normal persistent topic (let's call it source topic here). The
source topic and the shadow topic must have the same number of partitions or
both non-partitioned. Multiply shadow topics can be created from a source
topic.

Shadow topic shares the underlying bookie ledgers from its source topic. User
can't produce any messages to shadow topic directly and shadow topic don't
create any new ledger for messages, all messages in shadow topic come from
source topic.

Shadow topic have its own subscriptions and don't share with its source topic.
This means the shadow topic have its own cursor ledger to store persistent
mark-delete info for each persistent subscriptions.

The message sync procedure of shadow topic is supported by shadow replication,
which is very like geo-replication, with these difference:
1. Geo-replication only works between topic with the same name in different
   broker clusters. But shadow topic have no naming limitation and they can be
   in the same cluster.
2. Geo-replication duplicates data storage, but shadow topic don't.
3. Geo-replication replicates data from each other, it's bidirectional, but
   shadow replication only have one way data flow.


## API Changes

1. PulsarApi.proto.

Shadow topic need to know the original message id of the replicated messages,
in order to update new ledger and lac. So we need add a `shadow_message_id` in
CommandSend for replicator.

```
message CommandSend { // ... // message id for shadow topic optional
   MessageIdData shadow_message_id = 9; }
```

2. Admin API for creating shadow topic with source topic
```
   admin.topics().createShadowTopic(source-topic-name, shadow-topic-name)
```

## Implementation

A picture showing key components relations is added in github issue [3].

There are two key changes for implementation.
1. How to replicate messages to shadow topics.
2. How shadow topic manage shared ledgers info.

### 1. How to replicate messages to shadow topics. 

This part is mostly implemented by `ShadowReplicator`, which extends
`PersistentReplicator` introduced in geo-replication. The shadow topic list
is added as a new topic policy of the source topic. Source topic manage the
lifecycle of all the replicators. The key is to add `shadow_message_id` when
produce message to shadow topics.

### 2. How shadow topic manage shared ledgers info. 

This part is mostly implemented by `ShadowManagedLedger`, which extends
current `ManagedLedgerImpl` with two key override methods.

1. `initialize(..)`
a. Fetch ManagedLedgerInfo of source topic instead of current shadow topic.
   The source topic name is stored in the topic policy of the shadow topic.
b. Open the last ledger and read the explicit LAC from bookie, instead of
   creating new ledger. Reading LAC here requires that the source topic must
   enable explicit LAC feature by set `bookkeeperExplicitLacIntervalInMills`
   to non-zero value in broker.conf.
c. Do not start checkLedgerRollTask, which tries roll over ledger periodically

2. `internalAsyncAddEntry()` Instead of write entry data to bookie, It only
   update metadata of ledgers, like `currentLedger`, `lastConfirmedEntry` and
   put the replicated message into `EntryCache`.

Besides, some other problems need to be taken care of.
- Any ledger metadata updates need to be synced to shadow topic, including
  ledger offloading or ledger deletion. Shadow topic needs to watch the ledger
  info updates with metadata store and update in time.
- The local cached LAC of `LedgerHandle` won't updated in time, so we need
  refresh LAC when a managed cursor requests entries beyond known LAC.

## Reject Alternatives

See PIP-63[1].

## Reference 
[1] https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
[2] https://github.com/apache/pulsar/pull/11960 
[3] https://github.com/apache/pulsar/issues/16153


BR,
Haiting Jiang

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@gmail.com>.
Hi Penghui,

> Please also update the proposal for schema handling.

Done, all content discussed in this thread is updated in [1]

Thanks,
Haiting

[1] https://github.com/apache/pulsar/issues/16153

On Fri, Jul 29, 2022 at 1:19 PM PengHui Li <co...@gmail.com> wrote:

> Sound good to me.
>
> Please also update the proposal for schema handling.
>
> Thanks,
> Penghui
>
> Penghui
> On Jul 24, 2022, 17:53 +0800, Haiting Jiang <ji...@apache.org>,
> wrote:
> > Hi Penghui,
> >
> > > One question about the schema.
> > > How can the consumer get the schema from the shadow topic during
> > > consumption?
> > > We should add this part in the proposal.
> > >
> >
> > Thanks for the reminding.
> >
> > From what I see, Schema is part of a topic's metadata. So shadow topic
> won't
> > have it's own schema, but it shares the schema info of source topic.
> >
> > For consumers, we need to suppoort `GetSchema` command for shadow topic,
> and there are
> > two interface for this.
> >
> > 1. Binary protocol, which handles in `CommandGetSchema` in
> > `ServerCnx#handleGetSchema`. We only need to replace the requested shadow
> > topic 's `schemaName` to the `schemaName` of source topic, and the
> > underlying read operation is supported by
> > `SchemaRegistry#getSchema(String, SchemaVersion)`.
> >
> > 2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`.
> Similar
> > with the approach in binary protocol, replace the `schemaId` with source
> > topic in `SchemasResourceBase#getSchemaId`.
> >
> > For admins, we can support other "read" ops besides `getSchema`,
> including
> > `getAllSchemas` and `getVersionBySchema`, which all can be supported by
> the
> > same way as `getSchema`.
> >
> >
> > Thanks,
> > Haiting
> >
> >
> > On 2022/07/21 02:13:08 PengHui Li wrote:
> > > Hi Haiting,
> > >
> > > One question about the schema.
> > > How can the consumer get the schema from the shadow topic during
> > > consumption?
> > > We should add this part in the proposal.
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <as...@gmail.com>
> wrote:
> > >
> > > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <
> jianghaiting@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Asaf,
> > > > >
> > > > > > I did a quick reading and I couldn't understand the gist of this
> > > > change:
> > > > > > The shadow topic doesn't really have it's own messages, or it's
> own
> > > > > ledgers
> > > > > > right? When it reads messages, it reads from the original topic
> > > > ledgers.
> > > > > So
> > > > > > the only thing you need to do is sync the "metadata" - ledgers
> list?
> > > > >
> > > > > Yes, mostly ledger id list and LAC of the last ledger.
> > > >
> > > >
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > from original topic, without copy?
> > > > >
> > > > > Yes, old ledger information will be read from metadata store when
> > > > > ShadowManagedLedger initializes. The replicator is only for new
> messages,
> > > > > to
> > > > > reduce the consume latency of subscription in shadow topic. And the
> > > > reason
> > > > > we also replicates message data is to populates the entry cache
> when
> > > > shadow
> > > > > topic have many active subscriptions.
> > > > >
> > > > > One optimization we can do is that, there would be not much help
> for
> > > > shadow
> > > > > replicator to replicate message in backlog. We can come up with
> some
> > > > > policy to
> > > > > reset shadow replicator cursor in future PR.
> > > > >
> > > >
> > > > I'm not sure I'm following you.
> > > > What do you mean by old ledger information and new ledger
> information?
> > > >
> > > > What I'm trying to understand is: why do you need to copy the source
> topic
> > > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you
> just
> > > > use the original topic metadata?
> > > >
> > > >
> > > >
> > > > >
> > > > > > Another question - I couldn't understand why you need to change
> the
> > > > > > protocol to introduce shadow message id. Can you please explain
> that to
> > > > > me?
> > > > > > Is CommandSend used only internally between Pulsar Clusters or
> used by
> > > > a
> > > > > > Pulsar Client?
> > > > >
> > > > > CommandSend is designed for pulsar producer client first, and
> > > > > geo-replication
> > > > > reuse producer client to replicate messages between pulsar
> clusters.
> > > > >
> > > > > The shadow message id contains the ledger id and entry id of this
> > > > message.
> > > > > When shadow topic receive the message id, it is able to update
> > > > > `lastConfirmedEntry` directly, so that subscription can consume
> this this
> > > > > new
> > > > > message.
> > > > > Also shadow topic can tell if the message is from shadow
> replicator and
> > > > > reject
> > > > > otherwise.
> > > > >
> > > > >
> > > > I think the flow of information is the part I don't understand.
> > > >
> > > > In the PIP you write "The message sync procedure of shadow topic is
> > > > supported by shadow replication, which is very like geo-replication,
> with
> > > > these differences:"
> > > > What I don't understand is that you write that this is a read-only
> topic,
> > > > so why replicate/sync messages?
> > > >
> > > > I managed to understand that you want to populate the BK entry cache
> of the
> > > > topic ledgers in the shadow topic broker. Instead of reading from BK
> and
> > > > storing it in the cache, you favor copying from the source topic
> broker
> > > > cache memory to the shadow topic broker cache. Is this to save the
> > > > bandwidth of BK? I presume the most recent messages of BK would be in
> > > > memory anyway, no?
> > > >
> > > >
> > > >
> > > >
> > > > > Thanks,
> > > > > Haiting
> > > > >
> > > > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I did a quick reading and I couldn't understand the gist of this
> > > > change:
> > > > > > The shadow topic doesn't really have it's own messages, or it's
> own
> > > > > ledgers
> > > > > > right? When it reads messages, it reads from the original topic
> > > > ledgers.
> > > > > So
> > > > > > the only thing you need to do is sync the "metadata" - ledgers
> list?
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > from original topic, without copy?
> > > > > >
> > > > > > Another question - I couldn't understand why you need to change
> the
> > > > > > protocol to introduce shadow message id. Can you please explain
> that to
> > > > > me?
> > > > > > Is CommandSend used only internally between Pulsar Clusters or
> used by
> > > > a
> > > > > > Pulsar Client?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Asaf
> > > > > >
> > > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> > > > jianghaiting@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Pulsar community:
> > > > > > >
> > > > > > > I open a pip to discuss "Shadow Topic, an alternative way to
> support
> > > > > > > readonly topic ownership."
> > > > > > >
> > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > > > > >
> > > > > > > ---
> > > > > > >
> > > > > > > ## Motivation
> > > > > > >
> > > > > > > The motivation is the same as PIP-63[1], with a new broadcast
> use
> > > > case
> > > > > of
> > > > > > > supporting 100K subscriptions in a single topic.
> > > > > > > 1. The bandwidth of a broker limits the number of
> subscriptions for a
> > > > > > > single
> > > > > > > topic.
> > > > > > > 2. Subscriptions are competing for the network bandwidth on
> brokers.
> > > > > > > Different
> > > > > > > subscriptions might have different levels of severity.
> > > > > > > 3. When synchronizing cross-city message reading, cross-city
> access
> > > > > needs
> > > > > > > to
> > > > > > > be minimized.
> > > > > > > 4. [New] Broadcast with 100K subscriptions. There is a
> limitation of
> > > > > the
> > > > > > > subscription number of a single topic. It's tested by Hongjie
> from
> > > > > NTT
> > > > > > > Lab
> > > > > > > that with 40K subscriptions in a single topic, the client needs
> > > > > about
> > > > > > > 20min
> > > > > > > to start all client connections, and under 1 msg/s message
> > > > producer
> > > > > > > rate,
> > > > > > > the average end to end latency is about 2.9s. And for 100K
> > > > > > > subscriptions,
> > > > > > > the time of start connection and E2E latency is beyond
> > > > > consideration.
> > > > > > >
> > > > > > > However, it's too complicated to implement with original PIP-63
> > > > > proposal,
> > > > > > > the
> > > > > > > changed code is already over 3K+ lines, see PR#11960[2], and
> there
> > > > are
> > > > > > > still
> > > > > > > some problems left,
> > > > > > > 1. The LAC in readonly topic is updated in a polling pattern,
> which
> > > > > > > increases
> > > > > > > the bookie load bookie.
> > > > > > > 2. The message data of readonly topic won't be cached in
> broker.
> > > > > Increase
> > > > > > > the
> > > > > > > network usage between broker and bookie when there are more
> than
> > > > one
> > > > > > > subscriber is tail-reading.
> > > > > > > 3. All the subscriptions is managed in original
> writable-topic, so
> > > > the
> > > > > > > support
> > > > > > > max subscription number is not scaleable.
> > > > > > >
> > > > > > > This PIP tries to come up with a simpler solution to support
> readonly
> > > > > topic
> > > > > > > ownership and solve the problems the previous PR left. The
> main idea
> > > > of
> > > > > > > this
> > > > > > > solution is to reuse the feature of geo-replication, but
> instead of
> > > > > > > duplicating storage, it shares underlying bookie ledgers
> between
> > > > > different
> > > > > > > topics.
> > > > > > >
> > > > > > > ## Goal
> > > > > > >
> > > > > > > The goal is to introduce **Shadow Topic** as a new type of
> topic to
> > > > > support
> > > > > > > readonly topic ownership. Just as its name implies, a shadow
> topic is
> > > > > the
> > > > > > > shadow of some normal persistent topic (let's call it source
> topic
> > > > > here).
> > > > > > > The
> > > > > > > source topic and the shadow topic must have the same number of
> > > > > partitions
> > > > > > > or
> > > > > > > both non-partitioned. Multiply shadow topics can be created
> from a
> > > > > source
> > > > > > > topic.
> > > > > > >
> > > > > > > Shadow topic shares the underlying bookie ledgers from its
> source
> > > > > topic.
> > > > > > > User
> > > > > > > can't produce any messages to shadow topic directly and shadow
> topic
> > > > > don't
> > > > > > > create any new ledger for messages, all messages in shadow
> topic come
> > > > > from
> > > > > > > source topic.
> > > > > > >
> > > > > > > Shadow topic have its own subscriptions and don't share with
> its
> > > > source
> > > > > > > topic.
> > > > > > > This means the shadow topic have its own cursor ledger to store
> > > > > persistent
> > > > > > > mark-delete info for each persistent subscriptions.
> > > > > > >
> > > > > > > The message sync procedure of shadow topic is supported by
> shadow
> > > > > > > replication,
> > > > > > > which is very like geo-replication, with these difference:
> > > > > > > 1. Geo-replication only works between topic with the same name
> in
> > > > > different
> > > > > > > broker clusters. But shadow topic have no naming limitation and
> > > > they
> > > > > > > can be
> > > > > > > in the same cluster.
> > > > > > > 2. Geo-replication duplicates data storage, but shadow topic
> don't.
> > > > > > > 3. Geo-replication replicates data from each other, it's
> > > > > bidirectional, but
> > > > > > > shadow replication only have one way data flow.
> > > > > > >
> > > > > > >
> > > > > > > ## API Changes
> > > > > > >
> > > > > > > 1. PulsarApi.proto.
> > > > > > >
> > > > > > > Shadow topic need to know the original message id of the
> replicated
> > > > > > > messages,
> > > > > > > in order to update new ledger and lac. So we need add a
> > > > > > > `shadow_message_id` in
> > > > > > > CommandSend for replicator.
> > > > > > >
> > > > > > > ```
> > > > > > > message CommandSend { // ... // message id for shadow topic
> optional
> > > > > > > MessageIdData shadow_message_id = 9; }
> > > > > > > ```
> > > > > > >
> > > > > > > 2. Admin API for creating shadow topic with source topic
> > > > > > > ```
> > > > > > > admin.topics().createShadowTopic(source-topic-name,
> > > > > shadow-topic-name)
> > > > > > > ```
> > > > > > >
> > > > > > > ## Implementation
> > > > > > >
> > > > > > > A picture showing key components relations is added in github
> issue
> > > > > [3].
> > > > > > >
> > > > > > > There are two key changes for implementation.
> > > > > > > 1. How to replicate messages to shadow topics.
> > > > > > > 2. How shadow topic manage shared ledgers info.
> > > > > > >
> > > > > > > ### 1. How to replicate messages to shadow topics.
> > > > > > >
> > > > > > > This part is mostly implemented by `ShadowReplicator`, which
> extends
> > > > > > > `PersistentReplicator` introduced in geo-replication. The
> shadow
> > > > topic
> > > > > list
> > > > > > > is added as a new topic policy of the source topic. Source
> topic
> > > > > manage the
> > > > > > > lifecycle of all the replicators. The key is to add
> > > > `shadow_message_id`
> > > > > > > when
> > > > > > > produce message to shadow topics.
> > > > > > >
> > > > > > > ### 2. How shadow topic manage shared ledgers info.
> > > > > > >
> > > > > > > This part is mostly implemented by `ShadowManagedLedger`, which
> > > > extends
> > > > > > > current `ManagedLedgerImpl` with two key override methods.
> > > > > > >
> > > > > > > 1. `initialize(..)`
> > > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current
> shadow
> > > > > topic.
> > > > > > > The source topic name is stored in the topic policy of the
> shadow
> > > > > topic.
> > > > > > > b. Open the last ledger and read the explicit LAC from bookie,
> > > > instead
> > > > > of
> > > > > > > creating new ledger. Reading LAC here requires that the source
> > > > topic
> > > > > > > must
> > > > > > > enable explicit LAC feature by set
> > > > > > > `bookkeeperExplicitLacIntervalInMills`
> > > > > > > to non-zero value in broker.conf.
> > > > > > > c. Do not start checkLedgerRollTask, which tries roll over
> ledger
> > > > > > > periodically
> > > > > > >
> > > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to
> bookie,
> > > > It
> > > > > only
> > > > > > > update metadata of ledgers, like `currentLedger`,
> > > > > `lastConfirmedEntry`
> > > > > > > and
> > > > > > > put the replicated message into `EntryCache`.
> > > > > > >
> > > > > > > Besides, some other problems need to be taken care of.
> > > > > > > - Any ledger metadata updates need to be synced to shadow
> topic,
> > > > > including
> > > > > > > ledger offloading or ledger deletion. Shadow topic needs to
> watch
> > > > the
> > > > > > > ledger
> > > > > > > info updates with metadata store and update in time.
> > > > > > > - The local cached LAC of `LedgerHandle` won't updated in
> time, so we
> > > > > need
> > > > > > > refresh LAC when a managed cursor requests entries beyond known
> > > > LAC.
> > > > > > >
> > > > > > > ## Reject Alternatives
> > > > > > >
> > > > > > > See PIP-63[1].
> > > > > > >
> > > > > > > ## Reference
> > > > > > > [1]
> > > > > > >
> > > > >
> > > >
> https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > > > > [3] https://github.com/apache/pulsar/issues/16153
> > > > > > >
> > > > > > >
> > > > > > > BR,
> > > > > > > Haiting Jiang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by PengHui Li <co...@gmail.com>.
Sound good to me.

Please also update the proposal for schema handling.

Thanks,
Penghui

Penghui
On Jul 24, 2022, 17:53 +0800, Haiting Jiang <ji...@apache.org>, wrote:
> Hi Penghui,
>
> > One question about the schema.
> > How can the consumer get the schema from the shadow topic during
> > consumption?
> > We should add this part in the proposal.
> >
>
> Thanks for the reminding.
>
> From what I see, Schema is part of a topic's metadata. So shadow topic won't
> have it's own schema, but it shares the schema info of source topic.
>
> For consumers, we need to suppoort `GetSchema` command for shadow topic, and there are
> two interface for this.
>
> 1. Binary protocol, which handles in `CommandGetSchema` in
> `ServerCnx#handleGetSchema`. We only need to replace the requested shadow
> topic 's `schemaName` to the `schemaName` of source topic, and the
> underlying read operation is supported by
> `SchemaRegistry#getSchema(String, SchemaVersion)`.
>
> 2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`. Similar
> with the approach in binary protocol, replace the `schemaId` with source
> topic in `SchemasResourceBase#getSchemaId`.
>
> For admins, we can support other "read" ops besides `getSchema`, including
> `getAllSchemas` and `getVersionBySchema`, which all can be supported by the
> same way as `getSchema`.
>
>
> Thanks,
> Haiting
>
>
> On 2022/07/21 02:13:08 PengHui Li wrote:
> > Hi Haiting,
> >
> > One question about the schema.
> > How can the consumer get the schema from the shadow topic during
> > consumption?
> > We should add this part in the proposal.
> >
> > Thanks,
> > Penghui
> >
> > On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <as...@gmail.com> wrote:
> >
> > > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <ji...@apache.org>
> > > wrote:
> > >
> > > > Hi Asaf,
> > > >
> > > > > I did a quick reading and I couldn't understand the gist of this
> > > change:
> > > > > The shadow topic doesn't really have it's own messages, or it's own
> > > > ledgers
> > > > > right? When it reads messages, it reads from the original topic
> > > ledgers.
> > > > So
> > > > > the only thing you need to do is sync the "metadata" - ledgers list?
> > > >
> > > > Yes, mostly ledger id list and LAC of the last ledger.
> > >
> > >
> > > > > One question comes to mind here: Why not simply read the ledger
> > > > information
> > > > > from original topic, without copy?
> > > >
> > > > Yes, old ledger information will be read from metadata store when
> > > > ShadowManagedLedger initializes. The replicator is only for new messages,
> > > > to
> > > > reduce the consume latency of subscription in shadow topic. And the
> > > reason
> > > > we also replicates message data is to populates the entry cache when
> > > shadow
> > > > topic have many active subscriptions.
> > > >
> > > > One optimization we can do is that, there would be not much help for
> > > shadow
> > > > replicator to replicate message in backlog. We can come up with some
> > > > policy to
> > > > reset shadow replicator cursor in future PR.
> > > >
> > >
> > > I'm not sure I'm following you.
> > > What do you mean by old ledger information and new ledger information?
> > >
> > > What I'm trying to understand is: why do you need to copy the source topic
> > > metadata: Ledgers ID list and LAC of the last ledger? Why can't you just
> > > use the original topic metadata?
> > >
> > >
> > >
> > > >
> > > > > Another question - I couldn't understand why you need to change the
> > > > > protocol to introduce shadow message id. Can you please explain that to
> > > > me?
> > > > > Is CommandSend used only internally between Pulsar Clusters or used by
> > > a
> > > > > Pulsar Client?
> > > >
> > > > CommandSend is designed for pulsar producer client first, and
> > > > geo-replication
> > > > reuse producer client to replicate messages between pulsar clusters.
> > > >
> > > > The shadow message id contains the ledger id and entry id of this
> > > message.
> > > > When shadow topic receive the message id, it is able to update
> > > > `lastConfirmedEntry` directly, so that subscription can consume this this
> > > > new
> > > > message.
> > > > Also shadow topic can tell if the message is from shadow replicator and
> > > > reject
> > > > otherwise.
> > > >
> > > >
> > > I think the flow of information is the part I don't understand.
> > >
> > > In the PIP you write "The message sync procedure of shadow topic is
> > > supported by shadow replication, which is very like geo-replication, with
> > > these differences:"
> > > What I don't understand is that you write that this is a read-only topic,
> > > so why replicate/sync messages?
> > >
> > > I managed to understand that you want to populate the BK entry cache of the
> > > topic ledgers in the shadow topic broker. Instead of reading from BK and
> > > storing it in the cache, you favor copying from the source topic broker
> > > cache memory to the shadow topic broker cache. Is this to save the
> > > bandwidth of BK? I presume the most recent messages of BK would be in
> > > memory anyway, no?
> > >
> > >
> > >
> > >
> > > > Thanks,
> > > > Haiting
> > > >
> > > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > > Hi,
> > > > >
> > > > > I did a quick reading and I couldn't understand the gist of this
> > > change:
> > > > > The shadow topic doesn't really have it's own messages, or it's own
> > > > ledgers
> > > > > right? When it reads messages, it reads from the original topic
> > > ledgers.
> > > > So
> > > > > the only thing you need to do is sync the "metadata" - ledgers list?
> > > > > One question comes to mind here: Why not simply read the ledger
> > > > information
> > > > > from original topic, without copy?
> > > > >
> > > > > Another question - I couldn't understand why you need to change the
> > > > > protocol to introduce shadow message id. Can you please explain that to
> > > > me?
> > > > > Is CommandSend used only internally between Pulsar Clusters or used by
> > > a
> > > > > Pulsar Client?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Asaf
> > > > >
> > > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> > > jianghaiting@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi Pulsar community:
> > > > > >
> > > > > > I open a pip to discuss "Shadow Topic, an alternative way to support
> > > > > > readonly topic ownership."
> > > > > >
> > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > > > >
> > > > > > ---
> > > > > >
> > > > > > ## Motivation
> > > > > >
> > > > > > The motivation is the same as PIP-63[1], with a new broadcast use
> > > case
> > > > of
> > > > > > supporting 100K subscriptions in a single topic.
> > > > > > 1. The bandwidth of a broker limits the number of subscriptions for a
> > > > > > single
> > > > > > topic.
> > > > > > 2. Subscriptions are competing for the network bandwidth on brokers.
> > > > > > Different
> > > > > > subscriptions might have different levels of severity.
> > > > > > 3. When synchronizing cross-city message reading, cross-city access
> > > > needs
> > > > > > to
> > > > > > be minimized.
> > > > > > 4. [New] Broadcast with 100K subscriptions. There is a limitation of
> > > > the
> > > > > > subscription number of a single topic. It's tested by Hongjie from
> > > > NTT
> > > > > > Lab
> > > > > > that with 40K subscriptions in a single topic, the client needs
> > > > about
> > > > > > 20min
> > > > > > to start all client connections, and under 1 msg/s message
> > > producer
> > > > > > rate,
> > > > > > the average end to end latency is about 2.9s. And for 100K
> > > > > > subscriptions,
> > > > > > the time of start connection and E2E latency is beyond
> > > > consideration.
> > > > > >
> > > > > > However, it's too complicated to implement with original PIP-63
> > > > proposal,
> > > > > > the
> > > > > > changed code is already over 3K+ lines, see PR#11960[2], and there
> > > are
> > > > > > still
> > > > > > some problems left,
> > > > > > 1. The LAC in readonly topic is updated in a polling pattern, which
> > > > > > increases
> > > > > > the bookie load bookie.
> > > > > > 2. The message data of readonly topic won't be cached in broker.
> > > > Increase
> > > > > > the
> > > > > > network usage between broker and bookie when there are more than
> > > one
> > > > > > subscriber is tail-reading.
> > > > > > 3. All the subscriptions is managed in original writable-topic, so
> > > the
> > > > > > support
> > > > > > max subscription number is not scaleable.
> > > > > >
> > > > > > This PIP tries to come up with a simpler solution to support readonly
> > > > topic
> > > > > > ownership and solve the problems the previous PR left. The main idea
> > > of
> > > > > > this
> > > > > > solution is to reuse the feature of geo-replication, but instead of
> > > > > > duplicating storage, it shares underlying bookie ledgers between
> > > > different
> > > > > > topics.
> > > > > >
> > > > > > ## Goal
> > > > > >
> > > > > > The goal is to introduce **Shadow Topic** as a new type of topic to
> > > > support
> > > > > > readonly topic ownership. Just as its name implies, a shadow topic is
> > > > the
> > > > > > shadow of some normal persistent topic (let's call it source topic
> > > > here).
> > > > > > The
> > > > > > source topic and the shadow topic must have the same number of
> > > > partitions
> > > > > > or
> > > > > > both non-partitioned. Multiply shadow topics can be created from a
> > > > source
> > > > > > topic.
> > > > > >
> > > > > > Shadow topic shares the underlying bookie ledgers from its source
> > > > topic.
> > > > > > User
> > > > > > can't produce any messages to shadow topic directly and shadow topic
> > > > don't
> > > > > > create any new ledger for messages, all messages in shadow topic come
> > > > from
> > > > > > source topic.
> > > > > >
> > > > > > Shadow topic have its own subscriptions and don't share with its
> > > source
> > > > > > topic.
> > > > > > This means the shadow topic have its own cursor ledger to store
> > > > persistent
> > > > > > mark-delete info for each persistent subscriptions.
> > > > > >
> > > > > > The message sync procedure of shadow topic is supported by shadow
> > > > > > replication,
> > > > > > which is very like geo-replication, with these difference:
> > > > > > 1. Geo-replication only works between topic with the same name in
> > > > different
> > > > > > broker clusters. But shadow topic have no naming limitation and
> > > they
> > > > > > can be
> > > > > > in the same cluster.
> > > > > > 2. Geo-replication duplicates data storage, but shadow topic don't.
> > > > > > 3. Geo-replication replicates data from each other, it's
> > > > bidirectional, but
> > > > > > shadow replication only have one way data flow.
> > > > > >
> > > > > >
> > > > > > ## API Changes
> > > > > >
> > > > > > 1. PulsarApi.proto.
> > > > > >
> > > > > > Shadow topic need to know the original message id of the replicated
> > > > > > messages,
> > > > > > in order to update new ledger and lac. So we need add a
> > > > > > `shadow_message_id` in
> > > > > > CommandSend for replicator.
> > > > > >
> > > > > > ```
> > > > > > message CommandSend { // ... // message id for shadow topic optional
> > > > > > MessageIdData shadow_message_id = 9; }
> > > > > > ```
> > > > > >
> > > > > > 2. Admin API for creating shadow topic with source topic
> > > > > > ```
> > > > > > admin.topics().createShadowTopic(source-topic-name,
> > > > shadow-topic-name)
> > > > > > ```
> > > > > >
> > > > > > ## Implementation
> > > > > >
> > > > > > A picture showing key components relations is added in github issue
> > > > [3].
> > > > > >
> > > > > > There are two key changes for implementation.
> > > > > > 1. How to replicate messages to shadow topics.
> > > > > > 2. How shadow topic manage shared ledgers info.
> > > > > >
> > > > > > ### 1. How to replicate messages to shadow topics.
> > > > > >
> > > > > > This part is mostly implemented by `ShadowReplicator`, which extends
> > > > > > `PersistentReplicator` introduced in geo-replication. The shadow
> > > topic
> > > > list
> > > > > > is added as a new topic policy of the source topic. Source topic
> > > > manage the
> > > > > > lifecycle of all the replicators. The key is to add
> > > `shadow_message_id`
> > > > > > when
> > > > > > produce message to shadow topics.
> > > > > >
> > > > > > ### 2. How shadow topic manage shared ledgers info.
> > > > > >
> > > > > > This part is mostly implemented by `ShadowManagedLedger`, which
> > > extends
> > > > > > current `ManagedLedgerImpl` with two key override methods.
> > > > > >
> > > > > > 1. `initialize(..)`
> > > > > > a. Fetch ManagedLedgerInfo of source topic instead of current shadow
> > > > topic.
> > > > > > The source topic name is stored in the topic policy of the shadow
> > > > topic.
> > > > > > b. Open the last ledger and read the explicit LAC from bookie,
> > > instead
> > > > of
> > > > > > creating new ledger. Reading LAC here requires that the source
> > > topic
> > > > > > must
> > > > > > enable explicit LAC feature by set
> > > > > > `bookkeeperExplicitLacIntervalInMills`
> > > > > > to non-zero value in broker.conf.
> > > > > > c. Do not start checkLedgerRollTask, which tries roll over ledger
> > > > > > periodically
> > > > > >
> > > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie,
> > > It
> > > > only
> > > > > > update metadata of ledgers, like `currentLedger`,
> > > > `lastConfirmedEntry`
> > > > > > and
> > > > > > put the replicated message into `EntryCache`.
> > > > > >
> > > > > > Besides, some other problems need to be taken care of.
> > > > > > - Any ledger metadata updates need to be synced to shadow topic,
> > > > including
> > > > > > ledger offloading or ledger deletion. Shadow topic needs to watch
> > > the
> > > > > > ledger
> > > > > > info updates with metadata store and update in time.
> > > > > > - The local cached LAC of `LedgerHandle` won't updated in time, so we
> > > > need
> > > > > > refresh LAC when a managed cursor requests entries beyond known
> > > LAC.
> > > > > >
> > > > > > ## Reject Alternatives
> > > > > >
> > > > > > See PIP-63[1].
> > > > > >
> > > > > > ## Reference
> > > > > > [1]
> > > > > >
> > > >
> > > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > > > [3] https://github.com/apache/pulsar/issues/16153
> > > > > >
> > > > > >
> > > > > > BR,
> > > > > > Haiting Jiang
> > > > > >
> > > > >
> > > >
> > >
> >

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@apache.org>.
Hi Penghui,

> One question about the schema.
> How can the consumer get the schema from the shadow topic during
> consumption?
> We should add this part in the proposal.
> 

Thanks for the reminding.

From what I see, Schema is part of a topic's metadata. So shadow topic won't
have it's own schema, but it shares the schema info of source topic. 

For consumers, we need to suppoort `GetSchema` command for shadow topic, and there are
two interface for this.

1. Binary protocol, which handles in `CommandGetSchema` in
   `ServerCnx#handleGetSchema`. We only need to replace the requested shadow
   topic 's `schemaName` to the `schemaName` of source topic, and the
   underlying read operation is supported by
   `SchemaRegistry#getSchema(String, SchemaVersion)`.

2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`. Similar
   with the approach in binary protocol, replace the `schemaId` with source
   topic in `SchemasResourceBase#getSchemaId`.

For admins, we can support other "read" ops besides `getSchema`, including
`getAllSchemas` and `getVersionBySchema`, which all can be supported by the
same way as `getSchema`.


Thanks,
Haiting


On 2022/07/21 02:13:08 PengHui Li wrote:
> Hi Haiting,
> 
> One question about the schema.
> How can the consumer get the schema from the shadow topic during
> consumption?
> We should add this part in the proposal.
> 
> Thanks,
> Penghui
> 
> On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <as...@gmail.com> wrote:
> 
> > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <ji...@apache.org>
> > wrote:
> >
> > > Hi Asaf,
> > >
> > > > I did a quick reading and I couldn't understand the gist of this
> > change:
> > > > The shadow topic doesn't really have it's own messages, or it's own
> > > ledgers
> > > > right? When it reads messages, it reads from the original topic
> > ledgers.
> > > So
> > > > the only thing you need to do is sync the "metadata" - ledgers list?
> > >
> > > Yes, mostly ledger id list and LAC of the last ledger.
> >
> >
> > > > One question comes to mind here: Why not simply read the ledger
> > > information
> > > > from original topic, without copy?
> > >
> > > Yes, old ledger information will be read from metadata store when
> > > ShadowManagedLedger initializes. The replicator is only for new messages,
> > > to
> > > reduce the consume latency of subscription in shadow topic. And the
> > reason
> > > we also replicates message data is to populates the entry cache when
> > shadow
> > > topic have many active subscriptions.
> > >
> > > One optimization we can do is that, there would be not much help for
> > shadow
> > > replicator to replicate message in backlog. We can come up with some
> > > policy to
> > > reset shadow replicator cursor in future PR.
> > >
> >
> > I'm not sure I'm following you.
> > What do you mean by old ledger information and new ledger information?
> >
> > What I'm trying to understand is: why do you need to copy the source topic
> > metadata: Ledgers ID list and LAC of the last ledger? Why can't you just
> > use the original topic metadata?
> >
> >
> >
> > >
> > > > Another question - I couldn't understand why you need to change the
> > > > protocol to introduce shadow message id. Can you please explain that to
> > > me?
> > > > Is CommandSend used only internally between Pulsar Clusters or used by
> > a
> > > > Pulsar Client?
> > >
> > > CommandSend is designed for pulsar producer client first, and
> > > geo-replication
> > > reuse producer client to replicate messages between pulsar clusters.
> > >
> > > The shadow message id contains the ledger id and entry id of this
> > message.
> > > When shadow topic receive the message id, it is able to update
> > > `lastConfirmedEntry` directly, so that subscription can consume this this
> > > new
> > > message.
> > > Also shadow topic can tell if the message is from shadow replicator and
> > > reject
> > > otherwise.
> > >
> > >
> > I think the flow of information is the part I don't understand.
> >
> > In the PIP you write "The message sync procedure of shadow topic is
> > supported by shadow replication, which is very like geo-replication, with
> > these differences:"
> > What I don't understand is that you write that this is a read-only topic,
> > so why replicate/sync messages?
> >
> > I managed to understand that you want to populate the BK entry cache of the
> > topic ledgers in the shadow topic broker. Instead of reading from BK and
> > storing it in the cache, you favor copying from the source topic broker
> > cache memory to the shadow topic broker cache. Is this to save the
> > bandwidth of BK? I presume the most recent messages of BK would be in
> > memory anyway, no?
> >
> >
> >
> >
> > > Thanks,
> > > Haiting
> > >
> > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > Hi,
> > > >
> > > > I did a quick reading and I couldn't understand the gist of this
> > change:
> > > > The shadow topic doesn't really have it's own messages, or it's own
> > > ledgers
> > > > right? When it reads messages, it reads from the original topic
> > ledgers.
> > > So
> > > > the only thing you need to do is sync the "metadata" - ledgers list?
> > > > One question comes to mind here: Why not simply read the ledger
> > > information
> > > > from original topic, without copy?
> > > >
> > > > Another question - I couldn't understand why you need to change the
> > > > protocol to introduce shadow message id. Can you please explain that to
> > > me?
> > > > Is CommandSend used only internally between Pulsar Clusters or used by
> > a
> > > > Pulsar Client?
> > > >
> > > > Thanks,
> > > >
> > > > Asaf
> > > >
> > > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> > jianghaiting@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Pulsar community:
> > > > >
> > > > > I open a pip to discuss "Shadow Topic, an alternative way to support
> > > > > readonly topic ownership."
> > > > >
> > > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > > >
> > > > > ---
> > > > >
> > > > > ## Motivation
> > > > >
> > > > > The motivation is the same as PIP-63[1], with a new broadcast use
> > case
> > > of
> > > > > supporting 100K subscriptions in a single topic.
> > > > > 1. The bandwidth of a broker limits the number of subscriptions for a
> > > > > single
> > > > >    topic.
> > > > > 2. Subscriptions are competing for the network bandwidth on brokers.
> > > > > Different
> > > > >    subscriptions might have different levels of severity.
> > > > > 3. When synchronizing cross-city message reading, cross-city access
> > > needs
> > > > > to
> > > > >    be minimized.
> > > > > 4. [New] Broadcast with 100K subscriptions. There is a limitation of
> > > the
> > > > >    subscription number of a single topic. It's tested by Hongjie from
> > > NTT
> > > > > Lab
> > > > >    that with 40K subscriptions in a single topic, the client needs
> > > about
> > > > > 20min
> > > > >    to start all client connections, and under 1 msg/s message
> > producer
> > > > > rate,
> > > > >    the average end to end latency is about 2.9s. And for 100K
> > > > > subscriptions,
> > > > >    the time of start connection and E2E latency is beyond
> > > consideration.
> > > > >
> > > > > However, it's too complicated to implement with original PIP-63
> > > proposal,
> > > > > the
> > > > > changed code is already over 3K+ lines, see PR#11960[2], and there
> > are
> > > > > still
> > > > > some problems left,
> > > > > 1. The LAC in readonly topic is updated in a polling pattern, which
> > > > > increases
> > > > >    the bookie load bookie.
> > > > > 2. The message data of readonly topic won't be cached in broker.
> > > Increase
> > > > > the
> > > > >    network usage between broker and bookie when there are more than
> > one
> > > > >    subscriber is tail-reading.
> > > > > 3. All the subscriptions is managed in original writable-topic, so
> > the
> > > > > support
> > > > >    max subscription number is not scaleable.
> > > > >
> > > > > This PIP tries to come up with a simpler solution to support readonly
> > > topic
> > > > > ownership and solve the problems the previous PR left. The main idea
> > of
> > > > > this
> > > > > solution is to reuse the feature of geo-replication, but instead of
> > > > > duplicating storage, it shares underlying bookie ledgers between
> > > different
> > > > > topics.
> > > > >
> > > > > ## Goal
> > > > >
> > > > > The goal is to introduce **Shadow Topic** as a new type of topic to
> > > support
> > > > > readonly topic ownership. Just as its name implies, a shadow topic is
> > > the
> > > > > shadow of some normal persistent topic (let's call it source topic
> > > here).
> > > > > The
> > > > > source topic and the shadow topic must have the same number of
> > > partitions
> > > > > or
> > > > > both non-partitioned. Multiply shadow topics can be created from a
> > > source
> > > > > topic.
> > > > >
> > > > > Shadow topic shares the underlying bookie ledgers from its source
> > > topic.
> > > > > User
> > > > > can't produce any messages to shadow topic directly and shadow topic
> > > don't
> > > > > create any new ledger for messages, all messages in shadow topic come
> > > from
> > > > > source topic.
> > > > >
> > > > > Shadow topic have its own subscriptions and don't share with its
> > source
> > > > > topic.
> > > > > This means the shadow topic have its own cursor ledger to store
> > > persistent
> > > > > mark-delete info for each persistent subscriptions.
> > > > >
> > > > > The message sync procedure of shadow topic is supported by shadow
> > > > > replication,
> > > > > which is very like geo-replication, with these difference:
> > > > > 1. Geo-replication only works between topic with the same name in
> > > different
> > > > >    broker clusters. But shadow topic have no naming limitation and
> > they
> > > > > can be
> > > > >    in the same cluster.
> > > > > 2. Geo-replication duplicates data storage, but shadow topic don't.
> > > > > 3. Geo-replication replicates data from each other, it's
> > > bidirectional, but
> > > > >    shadow replication only have one way data flow.
> > > > >
> > > > >
> > > > > ## API Changes
> > > > >
> > > > > 1. PulsarApi.proto.
> > > > >
> > > > > Shadow topic need to know the original message id of the replicated
> > > > > messages,
> > > > > in order to update new ledger and lac. So we need add a
> > > > > `shadow_message_id` in
> > > > > CommandSend for replicator.
> > > > >
> > > > > ```
> > > > > message CommandSend { // ... // message id for shadow topic optional
> > > > >    MessageIdData shadow_message_id = 9; }
> > > > > ```
> > > > >
> > > > > 2. Admin API for creating shadow topic with source topic
> > > > > ```
> > > > >    admin.topics().createShadowTopic(source-topic-name,
> > > shadow-topic-name)
> > > > > ```
> > > > >
> > > > > ## Implementation
> > > > >
> > > > > A picture showing key components relations is added in github issue
> > > [3].
> > > > >
> > > > > There are two key changes for implementation.
> > > > > 1. How to replicate messages to shadow topics.
> > > > > 2. How shadow topic manage shared ledgers info.
> > > > >
> > > > > ### 1. How to replicate messages to shadow topics.
> > > > >
> > > > > This part is mostly implemented by `ShadowReplicator`, which extends
> > > > > `PersistentReplicator` introduced in geo-replication. The shadow
> > topic
> > > list
> > > > > is added as a new topic policy of the source topic. Source topic
> > > manage the
> > > > > lifecycle of all the replicators. The key is to add
> > `shadow_message_id`
> > > > > when
> > > > > produce message to shadow topics.
> > > > >
> > > > > ### 2. How shadow topic manage shared ledgers info.
> > > > >
> > > > > This part is mostly implemented by `ShadowManagedLedger`, which
> > extends
> > > > > current `ManagedLedgerImpl` with two key override methods.
> > > > >
> > > > > 1. `initialize(..)`
> > > > > a. Fetch ManagedLedgerInfo of source topic instead of current shadow
> > > topic.
> > > > >    The source topic name is stored in the topic policy of the shadow
> > > topic.
> > > > > b. Open the last ledger and read the explicit LAC from bookie,
> > instead
> > > of
> > > > >    creating new ledger. Reading LAC here requires that the source
> > topic
> > > > > must
> > > > >    enable explicit LAC feature by set
> > > > > `bookkeeperExplicitLacIntervalInMills`
> > > > >    to non-zero value in broker.conf.
> > > > > c. Do not start checkLedgerRollTask, which tries roll over ledger
> > > > > periodically
> > > > >
> > > > > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie,
> > It
> > > only
> > > > >    update metadata of ledgers, like `currentLedger`,
> > > `lastConfirmedEntry`
> > > > > and
> > > > >    put the replicated message into `EntryCache`.
> > > > >
> > > > > Besides, some other problems need to be taken care of.
> > > > > - Any ledger metadata updates need to be synced to shadow topic,
> > > including
> > > > >   ledger offloading or ledger deletion. Shadow topic needs to watch
> > the
> > > > > ledger
> > > > >   info updates with metadata store and update in time.
> > > > > - The local cached LAC of `LedgerHandle` won't updated in time, so we
> > > need
> > > > >   refresh LAC when a managed cursor requests entries beyond known
> > LAC.
> > > > >
> > > > > ## Reject Alternatives
> > > > >
> > > > > See PIP-63[1].
> > > > >
> > > > > ## Reference
> > > > > [1]
> > > > >
> > >
> > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > > [3] https://github.com/apache/pulsar/issues/16153
> > > > >
> > > > >
> > > > > BR,
> > > > > Haiting Jiang
> > > > >
> > > >
> > >
> >
> 

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by PengHui Li <pe...@apache.org>.
Hi Haiting,

One question about the schema.
How can the consumer get the schema from the shadow topic during
consumption?
We should add this part in the proposal.

Thanks,
Penghui

On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika <as...@gmail.com> wrote:

> On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <ji...@apache.org>
> wrote:
>
> > Hi Asaf,
> >
> > > I did a quick reading and I couldn't understand the gist of this
> change:
> > > The shadow topic doesn't really have it's own messages, or it's own
> > ledgers
> > > right? When it reads messages, it reads from the original topic
> ledgers.
> > So
> > > the only thing you need to do is sync the "metadata" - ledgers list?
> >
> > Yes, mostly ledger id list and LAC of the last ledger.
>
>
> > > One question comes to mind here: Why not simply read the ledger
> > information
> > > from original topic, without copy?
> >
> > Yes, old ledger information will be read from metadata store when
> > ShadowManagedLedger initializes. The replicator is only for new messages,
> > to
> > reduce the consume latency of subscription in shadow topic. And the
> reason
> > we also replicates message data is to populates the entry cache when
> shadow
> > topic have many active subscriptions.
> >
> > One optimization we can do is that, there would be not much help for
> shadow
> > replicator to replicate message in backlog. We can come up with some
> > policy to
> > reset shadow replicator cursor in future PR.
> >
>
> I'm not sure I'm following you.
> What do you mean by old ledger information and new ledger information?
>
> What I'm trying to understand is: why do you need to copy the source topic
> metadata: Ledgers ID list and LAC of the last ledger? Why can't you just
> use the original topic metadata?
>
>
>
> >
> > > Another question - I couldn't understand why you need to change the
> > > protocol to introduce shadow message id. Can you please explain that to
> > me?
> > > Is CommandSend used only internally between Pulsar Clusters or used by
> a
> > > Pulsar Client?
> >
> > CommandSend is designed for pulsar producer client first, and
> > geo-replication
> > reuse producer client to replicate messages between pulsar clusters.
> >
> > The shadow message id contains the ledger id and entry id of this
> message.
> > When shadow topic receive the message id, it is able to update
> > `lastConfirmedEntry` directly, so that subscription can consume this this
> > new
> > message.
> > Also shadow topic can tell if the message is from shadow replicator and
> > reject
> > otherwise.
> >
> >
> I think the flow of information is the part I don't understand.
>
> In the PIP you write "The message sync procedure of shadow topic is
> supported by shadow replication, which is very like geo-replication, with
> these differences:"
> What I don't understand is that you write that this is a read-only topic,
> so why replicate/sync messages?
>
> I managed to understand that you want to populate the BK entry cache of the
> topic ledgers in the shadow topic broker. Instead of reading from BK and
> storing it in the cache, you favor copying from the source topic broker
> cache memory to the shadow topic broker cache. Is this to save the
> bandwidth of BK? I presume the most recent messages of BK would be in
> memory anyway, no?
>
>
>
>
> > Thanks,
> > Haiting
> >
> > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > Hi,
> > >
> > > I did a quick reading and I couldn't understand the gist of this
> change:
> > > The shadow topic doesn't really have it's own messages, or it's own
> > ledgers
> > > right? When it reads messages, it reads from the original topic
> ledgers.
> > So
> > > the only thing you need to do is sync the "metadata" - ledgers list?
> > > One question comes to mind here: Why not simply read the ledger
> > information
> > > from original topic, without copy?
> > >
> > > Another question - I couldn't understand why you need to change the
> > > protocol to introduce shadow message id. Can you please explain that to
> > me?
> > > Is CommandSend used only internally between Pulsar Clusters or used by
> a
> > > Pulsar Client?
> > >
> > > Thanks,
> > >
> > > Asaf
> > >
> > > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <
> jianghaiting@apache.org>
> > > wrote:
> > >
> > > > Hi Pulsar community:
> > > >
> > > > I open a pip to discuss "Shadow Topic, an alternative way to support
> > > > readonly topic ownership."
> > > >
> > > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > > >
> > > > ---
> > > >
> > > > ## Motivation
> > > >
> > > > The motivation is the same as PIP-63[1], with a new broadcast use
> case
> > of
> > > > supporting 100K subscriptions in a single topic.
> > > > 1. The bandwidth of a broker limits the number of subscriptions for a
> > > > single
> > > >    topic.
> > > > 2. Subscriptions are competing for the network bandwidth on brokers.
> > > > Different
> > > >    subscriptions might have different levels of severity.
> > > > 3. When synchronizing cross-city message reading, cross-city access
> > needs
> > > > to
> > > >    be minimized.
> > > > 4. [New] Broadcast with 100K subscriptions. There is a limitation of
> > the
> > > >    subscription number of a single topic. It's tested by Hongjie from
> > NTT
> > > > Lab
> > > >    that with 40K subscriptions in a single topic, the client needs
> > about
> > > > 20min
> > > >    to start all client connections, and under 1 msg/s message
> producer
> > > > rate,
> > > >    the average end to end latency is about 2.9s. And for 100K
> > > > subscriptions,
> > > >    the time of start connection and E2E latency is beyond
> > consideration.
> > > >
> > > > However, it's too complicated to implement with original PIP-63
> > proposal,
> > > > the
> > > > changed code is already over 3K+ lines, see PR#11960[2], and there
> are
> > > > still
> > > > some problems left,
> > > > 1. The LAC in readonly topic is updated in a polling pattern, which
> > > > increases
> > > >    the bookie load bookie.
> > > > 2. The message data of readonly topic won't be cached in broker.
> > Increase
> > > > the
> > > >    network usage between broker and bookie when there are more than
> one
> > > >    subscriber is tail-reading.
> > > > 3. All the subscriptions is managed in original writable-topic, so
> the
> > > > support
> > > >    max subscription number is not scaleable.
> > > >
> > > > This PIP tries to come up with a simpler solution to support readonly
> > topic
> > > > ownership and solve the problems the previous PR left. The main idea
> of
> > > > this
> > > > solution is to reuse the feature of geo-replication, but instead of
> > > > duplicating storage, it shares underlying bookie ledgers between
> > different
> > > > topics.
> > > >
> > > > ## Goal
> > > >
> > > > The goal is to introduce **Shadow Topic** as a new type of topic to
> > support
> > > > readonly topic ownership. Just as its name implies, a shadow topic is
> > the
> > > > shadow of some normal persistent topic (let's call it source topic
> > here).
> > > > The
> > > > source topic and the shadow topic must have the same number of
> > partitions
> > > > or
> > > > both non-partitioned. Multiply shadow topics can be created from a
> > source
> > > > topic.
> > > >
> > > > Shadow topic shares the underlying bookie ledgers from its source
> > topic.
> > > > User
> > > > can't produce any messages to shadow topic directly and shadow topic
> > don't
> > > > create any new ledger for messages, all messages in shadow topic come
> > from
> > > > source topic.
> > > >
> > > > Shadow topic have its own subscriptions and don't share with its
> source
> > > > topic.
> > > > This means the shadow topic have its own cursor ledger to store
> > persistent
> > > > mark-delete info for each persistent subscriptions.
> > > >
> > > > The message sync procedure of shadow topic is supported by shadow
> > > > replication,
> > > > which is very like geo-replication, with these difference:
> > > > 1. Geo-replication only works between topic with the same name in
> > different
> > > >    broker clusters. But shadow topic have no naming limitation and
> they
> > > > can be
> > > >    in the same cluster.
> > > > 2. Geo-replication duplicates data storage, but shadow topic don't.
> > > > 3. Geo-replication replicates data from each other, it's
> > bidirectional, but
> > > >    shadow replication only have one way data flow.
> > > >
> > > >
> > > > ## API Changes
> > > >
> > > > 1. PulsarApi.proto.
> > > >
> > > > Shadow topic need to know the original message id of the replicated
> > > > messages,
> > > > in order to update new ledger and lac. So we need add a
> > > > `shadow_message_id` in
> > > > CommandSend for replicator.
> > > >
> > > > ```
> > > > message CommandSend { // ... // message id for shadow topic optional
> > > >    MessageIdData shadow_message_id = 9; }
> > > > ```
> > > >
> > > > 2. Admin API for creating shadow topic with source topic
> > > > ```
> > > >    admin.topics().createShadowTopic(source-topic-name,
> > shadow-topic-name)
> > > > ```
> > > >
> > > > ## Implementation
> > > >
> > > > A picture showing key components relations is added in github issue
> > [3].
> > > >
> > > > There are two key changes for implementation.
> > > > 1. How to replicate messages to shadow topics.
> > > > 2. How shadow topic manage shared ledgers info.
> > > >
> > > > ### 1. How to replicate messages to shadow topics.
> > > >
> > > > This part is mostly implemented by `ShadowReplicator`, which extends
> > > > `PersistentReplicator` introduced in geo-replication. The shadow
> topic
> > list
> > > > is added as a new topic policy of the source topic. Source topic
> > manage the
> > > > lifecycle of all the replicators. The key is to add
> `shadow_message_id`
> > > > when
> > > > produce message to shadow topics.
> > > >
> > > > ### 2. How shadow topic manage shared ledgers info.
> > > >
> > > > This part is mostly implemented by `ShadowManagedLedger`, which
> extends
> > > > current `ManagedLedgerImpl` with two key override methods.
> > > >
> > > > 1. `initialize(..)`
> > > > a. Fetch ManagedLedgerInfo of source topic instead of current shadow
> > topic.
> > > >    The source topic name is stored in the topic policy of the shadow
> > topic.
> > > > b. Open the last ledger and read the explicit LAC from bookie,
> instead
> > of
> > > >    creating new ledger. Reading LAC here requires that the source
> topic
> > > > must
> > > >    enable explicit LAC feature by set
> > > > `bookkeeperExplicitLacIntervalInMills`
> > > >    to non-zero value in broker.conf.
> > > > c. Do not start checkLedgerRollTask, which tries roll over ledger
> > > > periodically
> > > >
> > > > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie,
> It
> > only
> > > >    update metadata of ledgers, like `currentLedger`,
> > `lastConfirmedEntry`
> > > > and
> > > >    put the replicated message into `EntryCache`.
> > > >
> > > > Besides, some other problems need to be taken care of.
> > > > - Any ledger metadata updates need to be synced to shadow topic,
> > including
> > > >   ledger offloading or ledger deletion. Shadow topic needs to watch
> the
> > > > ledger
> > > >   info updates with metadata store and update in time.
> > > > - The local cached LAC of `LedgerHandle` won't updated in time, so we
> > need
> > > >   refresh LAC when a managed cursor requests entries beyond known
> LAC.
> > > >
> > > > ## Reject Alternatives
> > > >
> > > > See PIP-63[1].
> > > >
> > > > ## Reference
> > > > [1]
> > > >
> >
> https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > > [2] https://github.com/apache/pulsar/pull/11960
> > > > [3] https://github.com/apache/pulsar/issues/16153
> > > >
> > > >
> > > > BR,
> > > > Haiting Jiang
> > > >
> > >
> >
>

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Asaf Mesika <as...@gmail.com>.
On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang <ji...@apache.org>
wrote:

> Hi Asaf,
>
> > I did a quick reading and I couldn't understand the gist of this change:
> > The shadow topic doesn't really have it's own messages, or it's own
> ledgers
> > right? When it reads messages, it reads from the original topic ledgers.
> So
> > the only thing you need to do is sync the "metadata" - ledgers list?
>
> Yes, mostly ledger id list and LAC of the last ledger.


> > One question comes to mind here: Why not simply read the ledger
> information
> > from original topic, without copy?
>
> Yes, old ledger information will be read from metadata store when
> ShadowManagedLedger initializes. The replicator is only for new messages,
> to
> reduce the consume latency of subscription in shadow topic. And the reason
> we also replicates message data is to populates the entry cache when shadow
> topic have many active subscriptions.
>
> One optimization we can do is that, there would be not much help for shadow
> replicator to replicate message in backlog. We can come up with some
> policy to
> reset shadow replicator cursor in future PR.
>

I'm not sure I'm following you.
What do you mean by old ledger information and new ledger information?

What I'm trying to understand is: why do you need to copy the source topic
metadata: Ledgers ID list and LAC of the last ledger? Why can't you just
use the original topic metadata?



>
> > Another question - I couldn't understand why you need to change the
> > protocol to introduce shadow message id. Can you please explain that to
> me?
> > Is CommandSend used only internally between Pulsar Clusters or used by a
> > Pulsar Client?
>
> CommandSend is designed for pulsar producer client first, and
> geo-replication
> reuse producer client to replicate messages between pulsar clusters.
>
> The shadow message id contains the ledger id and entry id of this message.
> When shadow topic receive the message id, it is able to update
> `lastConfirmedEntry` directly, so that subscription can consume this this
> new
> message.
> Also shadow topic can tell if the message is from shadow replicator and
> reject
> otherwise.
>
>
I think the flow of information is the part I don't understand.

In the PIP you write "The message sync procedure of shadow topic is
supported by shadow replication, which is very like geo-replication, with
these differences:"
What I don't understand is that you write that this is a read-only topic,
so why replicate/sync messages?

I managed to understand that you want to populate the BK entry cache of the
topic ledgers in the shadow topic broker. Instead of reading from BK and
storing it in the cache, you favor copying from the source topic broker
cache memory to the shadow topic broker cache. Is this to save the
bandwidth of BK? I presume the most recent messages of BK would be in
memory anyway, no?




> Thanks,
> Haiting
>
> On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > Hi,
> >
> > I did a quick reading and I couldn't understand the gist of this change:
> > The shadow topic doesn't really have it's own messages, or it's own
> ledgers
> > right? When it reads messages, it reads from the original topic ledgers.
> So
> > the only thing you need to do is sync the "metadata" - ledgers list?
> > One question comes to mind here: Why not simply read the ledger
> information
> > from original topic, without copy?
> >
> > Another question - I couldn't understand why you need to change the
> > protocol to introduce shadow message id. Can you please explain that to
> me?
> > Is CommandSend used only internally between Pulsar Clusters or used by a
> > Pulsar Client?
> >
> > Thanks,
> >
> > Asaf
> >
> > On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <ji...@apache.org>
> > wrote:
> >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Shadow Topic, an alternative way to support
> > > readonly topic ownership."
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > >
> > > ---
> > >
> > > ## Motivation
> > >
> > > The motivation is the same as PIP-63[1], with a new broadcast use case
> of
> > > supporting 100K subscriptions in a single topic.
> > > 1. The bandwidth of a broker limits the number of subscriptions for a
> > > single
> > >    topic.
> > > 2. Subscriptions are competing for the network bandwidth on brokers.
> > > Different
> > >    subscriptions might have different levels of severity.
> > > 3. When synchronizing cross-city message reading, cross-city access
> needs
> > > to
> > >    be minimized.
> > > 4. [New] Broadcast with 100K subscriptions. There is a limitation of
> the
> > >    subscription number of a single topic. It's tested by Hongjie from
> NTT
> > > Lab
> > >    that with 40K subscriptions in a single topic, the client needs
> about
> > > 20min
> > >    to start all client connections, and under 1 msg/s message producer
> > > rate,
> > >    the average end to end latency is about 2.9s. And for 100K
> > > subscriptions,
> > >    the time of start connection and E2E latency is beyond
> consideration.
> > >
> > > However, it's too complicated to implement with original PIP-63
> proposal,
> > > the
> > > changed code is already over 3K+ lines, see PR#11960[2], and there are
> > > still
> > > some problems left,
> > > 1. The LAC in readonly topic is updated in a polling pattern, which
> > > increases
> > >    the bookie load bookie.
> > > 2. The message data of readonly topic won't be cached in broker.
> Increase
> > > the
> > >    network usage between broker and bookie when there are more than one
> > >    subscriber is tail-reading.
> > > 3. All the subscriptions is managed in original writable-topic, so the
> > > support
> > >    max subscription number is not scaleable.
> > >
> > > This PIP tries to come up with a simpler solution to support readonly
> topic
> > > ownership and solve the problems the previous PR left. The main idea of
> > > this
> > > solution is to reuse the feature of geo-replication, but instead of
> > > duplicating storage, it shares underlying bookie ledgers between
> different
> > > topics.
> > >
> > > ## Goal
> > >
> > > The goal is to introduce **Shadow Topic** as a new type of topic to
> support
> > > readonly topic ownership. Just as its name implies, a shadow topic is
> the
> > > shadow of some normal persistent topic (let's call it source topic
> here).
> > > The
> > > source topic and the shadow topic must have the same number of
> partitions
> > > or
> > > both non-partitioned. Multiply shadow topics can be created from a
> source
> > > topic.
> > >
> > > Shadow topic shares the underlying bookie ledgers from its source
> topic.
> > > User
> > > can't produce any messages to shadow topic directly and shadow topic
> don't
> > > create any new ledger for messages, all messages in shadow topic come
> from
> > > source topic.
> > >
> > > Shadow topic have its own subscriptions and don't share with its source
> > > topic.
> > > This means the shadow topic have its own cursor ledger to store
> persistent
> > > mark-delete info for each persistent subscriptions.
> > >
> > > The message sync procedure of shadow topic is supported by shadow
> > > replication,
> > > which is very like geo-replication, with these difference:
> > > 1. Geo-replication only works between topic with the same name in
> different
> > >    broker clusters. But shadow topic have no naming limitation and they
> > > can be
> > >    in the same cluster.
> > > 2. Geo-replication duplicates data storage, but shadow topic don't.
> > > 3. Geo-replication replicates data from each other, it's
> bidirectional, but
> > >    shadow replication only have one way data flow.
> > >
> > >
> > > ## API Changes
> > >
> > > 1. PulsarApi.proto.
> > >
> > > Shadow topic need to know the original message id of the replicated
> > > messages,
> > > in order to update new ledger and lac. So we need add a
> > > `shadow_message_id` in
> > > CommandSend for replicator.
> > >
> > > ```
> > > message CommandSend { // ... // message id for shadow topic optional
> > >    MessageIdData shadow_message_id = 9; }
> > > ```
> > >
> > > 2. Admin API for creating shadow topic with source topic
> > > ```
> > >    admin.topics().createShadowTopic(source-topic-name,
> shadow-topic-name)
> > > ```
> > >
> > > ## Implementation
> > >
> > > A picture showing key components relations is added in github issue
> [3].
> > >
> > > There are two key changes for implementation.
> > > 1. How to replicate messages to shadow topics.
> > > 2. How shadow topic manage shared ledgers info.
> > >
> > > ### 1. How to replicate messages to shadow topics.
> > >
> > > This part is mostly implemented by `ShadowReplicator`, which extends
> > > `PersistentReplicator` introduced in geo-replication. The shadow topic
> list
> > > is added as a new topic policy of the source topic. Source topic
> manage the
> > > lifecycle of all the replicators. The key is to add `shadow_message_id`
> > > when
> > > produce message to shadow topics.
> > >
> > > ### 2. How shadow topic manage shared ledgers info.
> > >
> > > This part is mostly implemented by `ShadowManagedLedger`, which extends
> > > current `ManagedLedgerImpl` with two key override methods.
> > >
> > > 1. `initialize(..)`
> > > a. Fetch ManagedLedgerInfo of source topic instead of current shadow
> topic.
> > >    The source topic name is stored in the topic policy of the shadow
> topic.
> > > b. Open the last ledger and read the explicit LAC from bookie, instead
> of
> > >    creating new ledger. Reading LAC here requires that the source topic
> > > must
> > >    enable explicit LAC feature by set
> > > `bookkeeperExplicitLacIntervalInMills`
> > >    to non-zero value in broker.conf.
> > > c. Do not start checkLedgerRollTask, which tries roll over ledger
> > > periodically
> > >
> > > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie, It
> only
> > >    update metadata of ledgers, like `currentLedger`,
> `lastConfirmedEntry`
> > > and
> > >    put the replicated message into `EntryCache`.
> > >
> > > Besides, some other problems need to be taken care of.
> > > - Any ledger metadata updates need to be synced to shadow topic,
> including
> > >   ledger offloading or ledger deletion. Shadow topic needs to watch the
> > > ledger
> > >   info updates with metadata store and update in time.
> > > - The local cached LAC of `LedgerHandle` won't updated in time, so we
> need
> > >   refresh LAC when a managed cursor requests entries beyond known LAC.
> > >
> > > ## Reject Alternatives
> > >
> > > See PIP-63[1].
> > >
> > > ## Reference
> > > [1]
> > >
> https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > > [2] https://github.com/apache/pulsar/pull/11960
> > > [3] https://github.com/apache/pulsar/issues/16153
> > >
> > >
> > > BR,
> > > Haiting Jiang
> > >
> >
>

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@apache.org>.
Hi Asaf,

> I did a quick reading and I couldn't understand the gist of this change:
> The shadow topic doesn't really have it's own messages, or it's own ledgers
> right? When it reads messages, it reads from the original topic ledgers. So
> the only thing you need to do is sync the "metadata" - ledgers list?

Yes, mostly ledger id list and LAC of the last ledger.

> One question comes to mind here: Why not simply read the ledger information
> from original topic, without copy?

Yes, old ledger information will be read from metadata store when
ShadowManagedLedger initializes. The replicator is only for new messages, to
reduce the consume latency of subscription in shadow topic. And the reason 
we also replicates message data is to populates the entry cache when shadow
topic have many active subscriptions.

One optimization we can do is that, there would be not much help for shadow
replicator to replicate message in backlog. We can come up with some policy to
reset shadow replicator cursor in future PR.

> Another question - I couldn't understand why you need to change the
> protocol to introduce shadow message id. Can you please explain that to me?
> Is CommandSend used only internally between Pulsar Clusters or used by a
> Pulsar Client?

CommandSend is designed for pulsar producer client first, and geo-replication
reuse producer client to replicate messages between pulsar clusters.

The shadow message id contains the ledger id and entry id of this message.
When shadow topic receive the message id, it is able to update
`lastConfirmedEntry` directly, so that subscription can consume this this new
message. 
Also shadow topic can tell if the message is from shadow replicator and reject 
otherwise.

Thanks, 
Haiting

On 2022/06/22 15:57:11 Asaf Mesika wrote:
> Hi,
> 
> I did a quick reading and I couldn't understand the gist of this change:
> The shadow topic doesn't really have it's own messages, or it's own ledgers
> right? When it reads messages, it reads from the original topic ledgers. So
> the only thing you need to do is sync the "metadata" - ledgers list?
> One question comes to mind here: Why not simply read the ledger information
> from original topic, without copy?
> 
> Another question - I couldn't understand why you need to change the
> protocol to introduce shadow message id. Can you please explain that to me?
> Is CommandSend used only internally between Pulsar Clusters or used by a
> Pulsar Client?
> 
> Thanks,
> 
> Asaf
> 
> On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <ji...@apache.org>
> wrote:
> 
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Shadow Topic, an alternative way to support
> > readonly topic ownership."
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/16153
> >
> > ---
> >
> > ## Motivation
> >
> > The motivation is the same as PIP-63[1], with a new broadcast use case of
> > supporting 100K subscriptions in a single topic.
> > 1. The bandwidth of a broker limits the number of subscriptions for a
> > single
> >    topic.
> > 2. Subscriptions are competing for the network bandwidth on brokers.
> > Different
> >    subscriptions might have different levels of severity.
> > 3. When synchronizing cross-city message reading, cross-city access needs
> > to
> >    be minimized.
> > 4. [New] Broadcast with 100K subscriptions. There is a limitation of the
> >    subscription number of a single topic. It's tested by Hongjie from NTT
> > Lab
> >    that with 40K subscriptions in a single topic, the client needs about
> > 20min
> >    to start all client connections, and under 1 msg/s message producer
> > rate,
> >    the average end to end latency is about 2.9s. And for 100K
> > subscriptions,
> >    the time of start connection and E2E latency is beyond consideration.
> >
> > However, it's too complicated to implement with original PIP-63 proposal,
> > the
> > changed code is already over 3K+ lines, see PR#11960[2], and there are
> > still
> > some problems left,
> > 1. The LAC in readonly topic is updated in a polling pattern, which
> > increases
> >    the bookie load bookie.
> > 2. The message data of readonly topic won't be cached in broker. Increase
> > the
> >    network usage between broker and bookie when there are more than one
> >    subscriber is tail-reading.
> > 3. All the subscriptions is managed in original writable-topic, so the
> > support
> >    max subscription number is not scaleable.
> >
> > This PIP tries to come up with a simpler solution to support readonly topic
> > ownership and solve the problems the previous PR left. The main idea of
> > this
> > solution is to reuse the feature of geo-replication, but instead of
> > duplicating storage, it shares underlying bookie ledgers between different
> > topics.
> >
> > ## Goal
> >
> > The goal is to introduce **Shadow Topic** as a new type of topic to support
> > readonly topic ownership. Just as its name implies, a shadow topic is the
> > shadow of some normal persistent topic (let's call it source topic here).
> > The
> > source topic and the shadow topic must have the same number of partitions
> > or
> > both non-partitioned. Multiply shadow topics can be created from a source
> > topic.
> >
> > Shadow topic shares the underlying bookie ledgers from its source topic.
> > User
> > can't produce any messages to shadow topic directly and shadow topic don't
> > create any new ledger for messages, all messages in shadow topic come from
> > source topic.
> >
> > Shadow topic have its own subscriptions and don't share with its source
> > topic.
> > This means the shadow topic have its own cursor ledger to store persistent
> > mark-delete info for each persistent subscriptions.
> >
> > The message sync procedure of shadow topic is supported by shadow
> > replication,
> > which is very like geo-replication, with these difference:
> > 1. Geo-replication only works between topic with the same name in different
> >    broker clusters. But shadow topic have no naming limitation and they
> > can be
> >    in the same cluster.
> > 2. Geo-replication duplicates data storage, but shadow topic don't.
> > 3. Geo-replication replicates data from each other, it's bidirectional, but
> >    shadow replication only have one way data flow.
> >
> >
> > ## API Changes
> >
> > 1. PulsarApi.proto.
> >
> > Shadow topic need to know the original message id of the replicated
> > messages,
> > in order to update new ledger and lac. So we need add a
> > `shadow_message_id` in
> > CommandSend for replicator.
> >
> > ```
> > message CommandSend { // ... // message id for shadow topic optional
> >    MessageIdData shadow_message_id = 9; }
> > ```
> >
> > 2. Admin API for creating shadow topic with source topic
> > ```
> >    admin.topics().createShadowTopic(source-topic-name, shadow-topic-name)
> > ```
> >
> > ## Implementation
> >
> > A picture showing key components relations is added in github issue [3].
> >
> > There are two key changes for implementation.
> > 1. How to replicate messages to shadow topics.
> > 2. How shadow topic manage shared ledgers info.
> >
> > ### 1. How to replicate messages to shadow topics.
> >
> > This part is mostly implemented by `ShadowReplicator`, which extends
> > `PersistentReplicator` introduced in geo-replication. The shadow topic list
> > is added as a new topic policy of the source topic. Source topic manage the
> > lifecycle of all the replicators. The key is to add `shadow_message_id`
> > when
> > produce message to shadow topics.
> >
> > ### 2. How shadow topic manage shared ledgers info.
> >
> > This part is mostly implemented by `ShadowManagedLedger`, which extends
> > current `ManagedLedgerImpl` with two key override methods.
> >
> > 1. `initialize(..)`
> > a. Fetch ManagedLedgerInfo of source topic instead of current shadow topic.
> >    The source topic name is stored in the topic policy of the shadow topic.
> > b. Open the last ledger and read the explicit LAC from bookie, instead of
> >    creating new ledger. Reading LAC here requires that the source topic
> > must
> >    enable explicit LAC feature by set
> > `bookkeeperExplicitLacIntervalInMills`
> >    to non-zero value in broker.conf.
> > c. Do not start checkLedgerRollTask, which tries roll over ledger
> > periodically
> >
> > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie, It only
> >    update metadata of ledgers, like `currentLedger`, `lastConfirmedEntry`
> > and
> >    put the replicated message into `EntryCache`.
> >
> > Besides, some other problems need to be taken care of.
> > - Any ledger metadata updates need to be synced to shadow topic, including
> >   ledger offloading or ledger deletion. Shadow topic needs to watch the
> > ledger
> >   info updates with metadata store and update in time.
> > - The local cached LAC of `LedgerHandle` won't updated in time, so we need
> >   refresh LAC when a managed cursor requests entries beyond known LAC.
> >
> > ## Reject Alternatives
> >
> > See PIP-63[1].
> >
> > ## Reference
> > [1]
> > https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > [2] https://github.com/apache/pulsar/pull/11960
> > [3] https://github.com/apache/pulsar/issues/16153
> >
> >
> > BR,
> > Haiting Jiang
> >
> 

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Asaf Mesika <as...@gmail.com>.
Hi Haiting,

I've noticed Shadow Topic is *not* covered in the documentation.

Can you please add documentation for it?

It's such a great feature, it's almost a waste not having it documented.

Once it is, I can it to the proud list of features Pulsar have in this
lovely page we recently launched: https://pulsar.apache.org/features/


On Fri, Jul 29, 2022 at 12:29 PM Haiting Jiang <ji...@gmail.com>
wrote:

> > I think that there are still some references to shadow_message_id but
> > IIUC we don't need to add that field anymore because we are going to
> > use
>
>
> > optional MessageIdData message_id = 9;
>
>
> > is this correct ?
>
>
> Yes, no more `shadow_message_id`. Sorry I missed these references, already
> updated.
>
> Thanks,
> Haiting
>
> On Fri, Jul 29, 2022 at 5:15 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > Il giorno ven 29 lug 2022 alle ore 09:05 Haiting Jiang
> > <ji...@apache.org> ha scritto:
> > >
> > > Hi Enrico,
> > >
> > > Any further suggestion on this PIP?
> > > If not, I would like to raise a  revote on this in a few days.
> >
> > now the PIP LGTM
> >
> > I think that there are still some references to shadow_message_id but
> > IIUC we don't need to add that field anymore because we are going to
> > use
> >
> > optional MessageIdData message_id = 9;
> >
> > is this correct ?
> >
> > Enrico
> >
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On 2022/07/07 11:30:59 Haiting Jiang wrote:
> > > > Hi Enrico,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > On 2022/07/05 08:03:43 Enrico Olivelli wrote:
> > > > > I have a couple of additional questions.
> > > > >
> > > > > 1. Security
> > > > > What about security permissions about the shadow topic ?
> > > > > We are reading from another topic.
> > > > > I think we must clarify the decisions in the PIP
> > > >
> > > > As shadow topic is usually in another namespace, it would have its
> own
> > > > independent permission settings, and we can configure different
> > permissions
> > > > for source topic and shadow topic. So there would be no guarantee
> that
> > you are
> > > > allowed to consume shadow topic if you have permission to consume
> > source
> > > > topic.
> > > >
> > > > On the other hand, we uses topic policy to store shadow topic
> > settings, so a
> > > > new policy permission item needs be added as PolicyName.SHADOW_TOPIC,
> > and user
> > > > must have PolicyOperation.WRITE to this policy to create/delete
> shadow
> > topics.
> > > >
> > > > >
> > > > > 2. Truncation and deletion
> > > > > What happens when you truncate or delete the source topic ?
> > > > > please add a paragraph on the PIP
> > > > >
> > > >
> > > > 1. Truncation, from command `bin/pulsar-admin topics truncate
> > source-topic`.
> > > > For source topic truncation, nothing changes. It still move all
> > cursors to the
> > > > end of the topic and delete all inactive ledgers.
> > > > As shadow topic will watch `ManagedLedgerInfo` in metadata store,
> once
> > it
> > > > knows ledgers deleted, all cursors will skip all deleted ledgers.
> > > >
> > > > 2. Deletion, from command `bin/pulsar-admin topics delete
> > source-topic`.
> > > > Like geo-replication, topic deletion is forbidden if topic have
> shadow
> > > > replicators, users have to delete shadow topics first. Here is the
> new
> > admin
> > > > API for managing shadow topics with source topic in
> > > > `org.apache.pulsar.client.admin.Topics` :
> > > > ```
> > > > void createShadowTopic(String sourceTopicName, String
> shadowTopicName);
> > > > void deleteShadowTopic(String sourceTopicName, String
> shadowTopicName);
> > > > List<String> admin.topics().getShadowTopics(String sourceTopicName);
> > > >
> > > > //And their async version methods.
> > > > ```
> > > > And this requires new REST interfaces in admin server, where
> > > > ```
> > > > PATH = "/{tenant}/{namespace}/{topic}/shadowTopics";
> > > > METHOD = POST/DELETE/GET;
> > > > ```
> > > >
> > > > > 3. Offloaders
> > > > > We are talking about BK metadata, how do Shadow Topics work with
> > > > > Offloaded ledgers ?
> > > > > Please clarify in the PIP
> > > >
> > > > Offloading a ledger is a kind of writing operation to topic's
> > metadata, so
> > > > shadow topic can't offload ledgers to other long term storage.
> > However, for
> > > > ledgers thats are already offloaded by source topic, it's expected to
> > support
> > > > reading from offload ledgers in shadow topic, just like read from
> > source
> > > > topic.
> > > >
> > > > The implementation depends on shadow topic watching
> > `ManagedLedgerInfo` in
> > > > metadata store, and if LedgerInfo.offloadContext is updated by source
> > topic
> > > > offloader, shadow topic can get fully information to get a readHandle
> > from
> > > > ledgerOffload. And of course, the pre-condition is the shadow topic
> > must have
> > > > the same offload driver settings.
> > > >
> > > > >
> > > > > 4. Changes in the number of partitions
> > > > > the PIP says that the number of partitions must match the source
> > topic.
> > > > > Are we preventing changes to the number of partitions in the source
> > topic ?
> > > > >
> > > >
> > > > No, the updates on partition number will be synced to the shadow
> topic.
> > > > A source topic or partition will be responsible for the creation and
> > deletion
> > > > of its corresponding shadow topic partitions.
> > > >
> > > > > 5. Topic stats
> > > > > We should add information on the source topic and on the shadow
> > topic.
> > > > > Please clarify or draft your intentions in the PIP
> > > > >
> > > >
> > > > For topic stats on source topic, as shadow replicator will reuse most
> > of current
> > > > PersistentReplicator, the ReplicatorStatsImpl also can be applied to
> > shadow replicators.
> > > > And we need to add a new field in `TopicStatsImpl` like
> > geo-replication:
> > > > ```
> > > > Map<String /*shadow topic name*/, ReplicatorStatsImpl>
> > shadowReplication;
> > > > ```
> > > >
> > > > As for topic stats on shadow topic, previous `TopicStatsImpl` still
> > applies.
> > > > And I don't see any other stats need to be added at this point.
> > > >
> > > >
> > > > > 6. GeoReplication
> > > > > I guess that GeoReplication will not be possible for shadow topics.
> > > > > Please clarify on the PIP
> > > > >
> > > >
> > > > Yes, this is decided by the nature of shadow topic that it don't have
> > the write access to BK.
> > > > And I don't see the necessary of supporting GeoReplication for shadow
> > topics.
> > > > We can make source topic geo-replicated and create the same shadow
> > topic in each clusters.
> > > >
> > > > > I believe that this feature is very powerful, but we must design it
> > > > > carefully and discuss
> > > > > about all the edge cases. Otherwise we will end up in something
> that
> > > > > is half-baked
> > > > > and we will have to resolve edge cases while developing or after
> > going
> > > > > to production.
> > > > >
> > > > > Every feature must be fully integrated with the rest of Pulsar
> > > > >
> > > > > Enrico
> > > > >
> > > > > Il giorno mer 29 giu 2022 alle ore 08:40 Haiting Jiang
> > > > > <ji...@apache.org> ha scritto:
> > > > > >
> > > > > > Hi Penghui
> > > > > >
> > > > > > On 2022/06/29 04:07:35 PengHui Li wrote:
> > > > > > > Hi Haiting,
> > > > > > >
> > > > > > > Thanks for the explanation. I'm clear for now.
> > > > > > >
> > > > > > > Pulsar functions also can do such things by connecting data
> from
> > one topic
> > > > > > > to another topic.
> > > > > > > But the difference is this proposal only copies the data to the
> > cache of
> > > > > > > another topic, and the data not
> > > > > > > in the cache is also available by reading from ledgers.
> > > > > > >
> > > > > > > And this approach also follows benefits compared with
> > replicating data to
> > > > > > > multiple "real" topics.
> > > > > > >
> > > > > > > - reuse the topic metadata
> > > > > > > - the same message ID which easy for troubleshooting
> > > > > > >
> > > > > > > Just one question
> > > > > > >
> > > > > > > >>>>>>>
> > > > > > > ```
> > > > > > > message CommandSend { // ... // message id for shadow topic
> > optional
> > > > > > >    MessageIdData shadow_message_id = 9; }
> > > > > > > ```
> > > > > > >
> > > > > > > Can we get the message ID from the replicated data to avoid
> > introducing a
> > > > > > > new command?
> > > > > > > Or use a marker message to avoid broker-to-broker directly
> > protobuf command
> > > > > > > interaction.
> > > > > > >
> > > > > > Sorry for not wrote it clearly. CommandSend is not a new command.
> > It's exactly the main
> > > > > > command producer used to send message to broker. The only change
> > is add a new field in it.
> > > > > > The whole command proto would be like this:
> > > > > > ```
> > > > > > message CommandSend {
> > > > > >     required uint64 producer_id = 1;
> > > > > >     required uint64 sequence_id = 2;
> > > > > >     optional int32 num_messages = 3 [default = 1];
> > > > > >     optional uint64 txnid_least_bits = 4 [default = 0];
> > > > > >     optional uint64 txnid_most_bits = 5 [default = 0];
> > > > > >
> > > > > >     /// Add highest sequence id to support batch message with
> > external sequence id
> > > > > >     optional uint64 highest_sequence_id = 6 [default = 0];
> > > > > >     optional bool is_chunk     =7 [default = false];
> > > > > >
> > > > > >     // Specify if the message being published is a Pulsar marker
> > or not
> > > > > >     optional bool marker = 8 [default = false];
> > > > > >
> > > > > >     // message id for shadow topic
> > > > > >     optional MessageIdData shadow_message_id = 9;
> > > > > > }
> > > > > > ```
> > > > > > So there won't be any broker-to-broker directly protobuf command
> > interactions.
> > > > > >
> > > > > > Thanks,
> > > > > > Haiting
> > > > > >
> > > > > > > Thanks,
> > > > > > > Penghui
> > > > > > >
> > > > > > > On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <
> > jianghaiting@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Penghui & Asaf:
> > > > > > > >
> > > > > > > > Please allow me to provide some more detailes about
> > **metadata**
> > > > > > > > synchronization
> > > > > > > > between source topic and shadow topic.
> > > > > > > >
> > > > > > > > 1.When shadow topic initializes, it will read from metadata
> > store path
> > > > > > > > "/managed-ledgers/{source_topic_ledger_name}", which contains
> > all the
> > > > > > > > managed ledger info. We don't
> > > > > > > > need to read the  ledger information from source topic
> broker.
> > > > > > > >
> > > > > > > > 2. When shadow topic received new message from replicator, if
> > the ledger
> > > > > > > > id of the message
> > > > > > > >  is the same as the last ledger, it just updates the LAC. If
> > not, it will
> > > > > > > > update ledger list from metadata,
> > > > > > > > and then open the new ledger handle and update the LAC.
> > > > > > > >
> > > > > > > > As for the copy itself and add shadow message id in
> > CommandSend, it mostly
> > > > > > > > serves the purpose
> > > > > > > > of filling the EntryCache.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Haiting
> > > > > > > >
> > > > > > > > On 2022/06/23 02:08:46 PengHui Li wrote:
> > > > > > > > > > One question comes to mind here: Why not simply read the
> > ledger
> > > > > > > > information
> > > > > > > > > from original topic, without copy?
> > > > > > > > >
> > > > > > > > > I think this is a good idea.
> > > > > > > > >
> > > > > > > > > Penghui
> > > > > > > > > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org,
> wrote:
> > > > > > > > > >
> > > > > > > > > > One question comes to mind here: Why not simply read the
> > ledger
> > > > > > > > information
> > > > > > > > > > from original topic, without copy?
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > > > BR,
> > > > Haiting
> > > >
> >
>

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@gmail.com>.
> I think that there are still some references to shadow_message_id but
> IIUC we don't need to add that field anymore because we are going to
> use


> optional MessageIdData message_id = 9;


> is this correct ?


Yes, no more `shadow_message_id`. Sorry I missed these references, already
updated.

Thanks,
Haiting

On Fri, Jul 29, 2022 at 5:15 PM Enrico Olivelli <eo...@gmail.com> wrote:

> Il giorno ven 29 lug 2022 alle ore 09:05 Haiting Jiang
> <ji...@apache.org> ha scritto:
> >
> > Hi Enrico,
> >
> > Any further suggestion on this PIP?
> > If not, I would like to raise a  revote on this in a few days.
>
> now the PIP LGTM
>
> I think that there are still some references to shadow_message_id but
> IIUC we don't need to add that field anymore because we are going to
> use
>
> optional MessageIdData message_id = 9;
>
> is this correct ?
>
> Enrico
>
> >
> > Thanks,
> > Haiting
> >
> > On 2022/07/07 11:30:59 Haiting Jiang wrote:
> > > Hi Enrico,
> > >
> > > Thanks for your feedback.
> > >
> > > On 2022/07/05 08:03:43 Enrico Olivelli wrote:
> > > > I have a couple of additional questions.
> > > >
> > > > 1. Security
> > > > What about security permissions about the shadow topic ?
> > > > We are reading from another topic.
> > > > I think we must clarify the decisions in the PIP
> > >
> > > As shadow topic is usually in another namespace, it would have its own
> > > independent permission settings, and we can configure different
> permissions
> > > for source topic and shadow topic. So there would be no guarantee that
> you are
> > > allowed to consume shadow topic if you have permission to consume
> source
> > > topic.
> > >
> > > On the other hand, we uses topic policy to store shadow topic
> settings, so a
> > > new policy permission item needs be added as PolicyName.SHADOW_TOPIC,
> and user
> > > must have PolicyOperation.WRITE to this policy to create/delete shadow
> topics.
> > >
> > > >
> > > > 2. Truncation and deletion
> > > > What happens when you truncate or delete the source topic ?
> > > > please add a paragraph on the PIP
> > > >
> > >
> > > 1. Truncation, from command `bin/pulsar-admin topics truncate
> source-topic`.
> > > For source topic truncation, nothing changes. It still move all
> cursors to the
> > > end of the topic and delete all inactive ledgers.
> > > As shadow topic will watch `ManagedLedgerInfo` in metadata store, once
> it
> > > knows ledgers deleted, all cursors will skip all deleted ledgers.
> > >
> > > 2. Deletion, from command `bin/pulsar-admin topics delete
> source-topic`.
> > > Like geo-replication, topic deletion is forbidden if topic have shadow
> > > replicators, users have to delete shadow topics first. Here is the new
> admin
> > > API for managing shadow topics with source topic in
> > > `org.apache.pulsar.client.admin.Topics` :
> > > ```
> > > void createShadowTopic(String sourceTopicName, String shadowTopicName);
> > > void deleteShadowTopic(String sourceTopicName, String shadowTopicName);
> > > List<String> admin.topics().getShadowTopics(String sourceTopicName);
> > >
> > > //And their async version methods.
> > > ```
> > > And this requires new REST interfaces in admin server, where
> > > ```
> > > PATH = "/{tenant}/{namespace}/{topic}/shadowTopics";
> > > METHOD = POST/DELETE/GET;
> > > ```
> > >
> > > > 3. Offloaders
> > > > We are talking about BK metadata, how do Shadow Topics work with
> > > > Offloaded ledgers ?
> > > > Please clarify in the PIP
> > >
> > > Offloading a ledger is a kind of writing operation to topic's
> metadata, so
> > > shadow topic can't offload ledgers to other long term storage.
> However, for
> > > ledgers thats are already offloaded by source topic, it's expected to
> support
> > > reading from offload ledgers in shadow topic, just like read from
> source
> > > topic.
> > >
> > > The implementation depends on shadow topic watching
> `ManagedLedgerInfo` in
> > > metadata store, and if LedgerInfo.offloadContext is updated by source
> topic
> > > offloader, shadow topic can get fully information to get a readHandle
> from
> > > ledgerOffload. And of course, the pre-condition is the shadow topic
> must have
> > > the same offload driver settings.
> > >
> > > >
> > > > 4. Changes in the number of partitions
> > > > the PIP says that the number of partitions must match the source
> topic.
> > > > Are we preventing changes to the number of partitions in the source
> topic ?
> > > >
> > >
> > > No, the updates on partition number will be synced to the shadow topic.
> > > A source topic or partition will be responsible for the creation and
> deletion
> > > of its corresponding shadow topic partitions.
> > >
> > > > 5. Topic stats
> > > > We should add information on the source topic and on the shadow
> topic.
> > > > Please clarify or draft your intentions in the PIP
> > > >
> > >
> > > For topic stats on source topic, as shadow replicator will reuse most
> of current
> > > PersistentReplicator, the ReplicatorStatsImpl also can be applied to
> shadow replicators.
> > > And we need to add a new field in `TopicStatsImpl` like
> geo-replication:
> > > ```
> > > Map<String /*shadow topic name*/, ReplicatorStatsImpl>
> shadowReplication;
> > > ```
> > >
> > > As for topic stats on shadow topic, previous `TopicStatsImpl` still
> applies.
> > > And I don't see any other stats need to be added at this point.
> > >
> > >
> > > > 6. GeoReplication
> > > > I guess that GeoReplication will not be possible for shadow topics.
> > > > Please clarify on the PIP
> > > >
> > >
> > > Yes, this is decided by the nature of shadow topic that it don't have
> the write access to BK.
> > > And I don't see the necessary of supporting GeoReplication for shadow
> topics.
> > > We can make source topic geo-replicated and create the same shadow
> topic in each clusters.
> > >
> > > > I believe that this feature is very powerful, but we must design it
> > > > carefully and discuss
> > > > about all the edge cases. Otherwise we will end up in something that
> > > > is half-baked
> > > > and we will have to resolve edge cases while developing or after
> going
> > > > to production.
> > > >
> > > > Every feature must be fully integrated with the rest of Pulsar
> > > >
> > > > Enrico
> > > >
> > > > Il giorno mer 29 giu 2022 alle ore 08:40 Haiting Jiang
> > > > <ji...@apache.org> ha scritto:
> > > > >
> > > > > Hi Penghui
> > > > >
> > > > > On 2022/06/29 04:07:35 PengHui Li wrote:
> > > > > > Hi Haiting,
> > > > > >
> > > > > > Thanks for the explanation. I'm clear for now.
> > > > > >
> > > > > > Pulsar functions also can do such things by connecting data from
> one topic
> > > > > > to another topic.
> > > > > > But the difference is this proposal only copies the data to the
> cache of
> > > > > > another topic, and the data not
> > > > > > in the cache is also available by reading from ledgers.
> > > > > >
> > > > > > And this approach also follows benefits compared with
> replicating data to
> > > > > > multiple "real" topics.
> > > > > >
> > > > > > - reuse the topic metadata
> > > > > > - the same message ID which easy for troubleshooting
> > > > > >
> > > > > > Just one question
> > > > > >
> > > > > > >>>>>>>
> > > > > > ```
> > > > > > message CommandSend { // ... // message id for shadow topic
> optional
> > > > > >    MessageIdData shadow_message_id = 9; }
> > > > > > ```
> > > > > >
> > > > > > Can we get the message ID from the replicated data to avoid
> introducing a
> > > > > > new command?
> > > > > > Or use a marker message to avoid broker-to-broker directly
> protobuf command
> > > > > > interaction.
> > > > > >
> > > > > Sorry for not wrote it clearly. CommandSend is not a new command.
> It's exactly the main
> > > > > command producer used to send message to broker. The only change
> is add a new field in it.
> > > > > The whole command proto would be like this:
> > > > > ```
> > > > > message CommandSend {
> > > > >     required uint64 producer_id = 1;
> > > > >     required uint64 sequence_id = 2;
> > > > >     optional int32 num_messages = 3 [default = 1];
> > > > >     optional uint64 txnid_least_bits = 4 [default = 0];
> > > > >     optional uint64 txnid_most_bits = 5 [default = 0];
> > > > >
> > > > >     /// Add highest sequence id to support batch message with
> external sequence id
> > > > >     optional uint64 highest_sequence_id = 6 [default = 0];
> > > > >     optional bool is_chunk     =7 [default = false];
> > > > >
> > > > >     // Specify if the message being published is a Pulsar marker
> or not
> > > > >     optional bool marker = 8 [default = false];
> > > > >
> > > > >     // message id for shadow topic
> > > > >     optional MessageIdData shadow_message_id = 9;
> > > > > }
> > > > > ```
> > > > > So there won't be any broker-to-broker directly protobuf command
> interactions.
> > > > >
> > > > > Thanks,
> > > > > Haiting
> > > > >
> > > > > > Thanks,
> > > > > > Penghui
> > > > > >
> > > > > > On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <
> jianghaiting@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Penghui & Asaf:
> > > > > > >
> > > > > > > Please allow me to provide some more detailes about
> **metadata**
> > > > > > > synchronization
> > > > > > > between source topic and shadow topic.
> > > > > > >
> > > > > > > 1.When shadow topic initializes, it will read from metadata
> store path
> > > > > > > "/managed-ledgers/{source_topic_ledger_name}", which contains
> all the
> > > > > > > managed ledger info. We don't
> > > > > > > need to read the  ledger information from source topic broker.
> > > > > > >
> > > > > > > 2. When shadow topic received new message from replicator, if
> the ledger
> > > > > > > id of the message
> > > > > > >  is the same as the last ledger, it just updates the LAC. If
> not, it will
> > > > > > > update ledger list from metadata,
> > > > > > > and then open the new ledger handle and update the LAC.
> > > > > > >
> > > > > > > As for the copy itself and add shadow message id in
> CommandSend, it mostly
> > > > > > > serves the purpose
> > > > > > > of filling the EntryCache.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Haiting
> > > > > > >
> > > > > > > On 2022/06/23 02:08:46 PengHui Li wrote:
> > > > > > > > > One question comes to mind here: Why not simply read the
> ledger
> > > > > > > information
> > > > > > > > from original topic, without copy?
> > > > > > > >
> > > > > > > > I think this is a good idea.
> > > > > > > >
> > > > > > > > Penghui
> > > > > > > > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> > > > > > > > >
> > > > > > > > > One question comes to mind here: Why not simply read the
> ledger
> > > > > > > information
> > > > > > > > > from original topic, without copy?
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> > > BR,
> > > Haiting
> > >
>

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Enrico Olivelli <eo...@gmail.com>.
Il giorno ven 29 lug 2022 alle ore 09:05 Haiting Jiang
<ji...@apache.org> ha scritto:
>
> Hi Enrico,
>
> Any further suggestion on this PIP?
> If not, I would like to raise a  revote on this in a few days.

now the PIP LGTM

I think that there are still some references to shadow_message_id but
IIUC we don't need to add that field anymore because we are going to
use

optional MessageIdData message_id = 9;

is this correct ?

Enrico

>
> Thanks,
> Haiting
>
> On 2022/07/07 11:30:59 Haiting Jiang wrote:
> > Hi Enrico,
> >
> > Thanks for your feedback.
> >
> > On 2022/07/05 08:03:43 Enrico Olivelli wrote:
> > > I have a couple of additional questions.
> > >
> > > 1. Security
> > > What about security permissions about the shadow topic ?
> > > We are reading from another topic.
> > > I think we must clarify the decisions in the PIP
> >
> > As shadow topic is usually in another namespace, it would have its own
> > independent permission settings, and we can configure different permissions
> > for source topic and shadow topic. So there would be no guarantee that you are
> > allowed to consume shadow topic if you have permission to consume source
> > topic.
> >
> > On the other hand, we uses topic policy to store shadow topic settings, so a
> > new policy permission item needs be added as PolicyName.SHADOW_TOPIC, and user
> > must have PolicyOperation.WRITE to this policy to create/delete shadow topics.
> >
> > >
> > > 2. Truncation and deletion
> > > What happens when you truncate or delete the source topic ?
> > > please add a paragraph on the PIP
> > >
> >
> > 1. Truncation, from command `bin/pulsar-admin topics truncate source-topic`.
> > For source topic truncation, nothing changes. It still move all cursors to the
> > end of the topic and delete all inactive ledgers.
> > As shadow topic will watch `ManagedLedgerInfo` in metadata store, once it
> > knows ledgers deleted, all cursors will skip all deleted ledgers.
> >
> > 2. Deletion, from command `bin/pulsar-admin topics delete source-topic`.
> > Like geo-replication, topic deletion is forbidden if topic have shadow
> > replicators, users have to delete shadow topics first. Here is the new admin
> > API for managing shadow topics with source topic in
> > `org.apache.pulsar.client.admin.Topics` :
> > ```
> > void createShadowTopic(String sourceTopicName, String shadowTopicName);
> > void deleteShadowTopic(String sourceTopicName, String shadowTopicName);
> > List<String> admin.topics().getShadowTopics(String sourceTopicName);
> >
> > //And their async version methods.
> > ```
> > And this requires new REST interfaces in admin server, where
> > ```
> > PATH = "/{tenant}/{namespace}/{topic}/shadowTopics";
> > METHOD = POST/DELETE/GET;
> > ```
> >
> > > 3. Offloaders
> > > We are talking about BK metadata, how do Shadow Topics work with
> > > Offloaded ledgers ?
> > > Please clarify in the PIP
> >
> > Offloading a ledger is a kind of writing operation to topic's metadata, so
> > shadow topic can't offload ledgers to other long term storage. However, for
> > ledgers thats are already offloaded by source topic, it's expected to support
> > reading from offload ledgers in shadow topic, just like read from source
> > topic.
> >
> > The implementation depends on shadow topic watching `ManagedLedgerInfo` in
> > metadata store, and if LedgerInfo.offloadContext is updated by source topic
> > offloader, shadow topic can get fully information to get a readHandle from
> > ledgerOffload. And of course, the pre-condition is the shadow topic must have
> > the same offload driver settings.
> >
> > >
> > > 4. Changes in the number of partitions
> > > the PIP says that the number of partitions must match the source topic.
> > > Are we preventing changes to the number of partitions in the source topic ?
> > >
> >
> > No, the updates on partition number will be synced to the shadow topic.
> > A source topic or partition will be responsible for the creation and deletion
> > of its corresponding shadow topic partitions.
> >
> > > 5. Topic stats
> > > We should add information on the source topic and on the shadow topic.
> > > Please clarify or draft your intentions in the PIP
> > >
> >
> > For topic stats on source topic, as shadow replicator will reuse most of current
> > PersistentReplicator, the ReplicatorStatsImpl also can be applied to shadow replicators.
> > And we need to add a new field in `TopicStatsImpl` like geo-replication:
> > ```
> > Map<String /*shadow topic name*/, ReplicatorStatsImpl> shadowReplication;
> > ```
> >
> > As for topic stats on shadow topic, previous `TopicStatsImpl` still applies.
> > And I don't see any other stats need to be added at this point.
> >
> >
> > > 6. GeoReplication
> > > I guess that GeoReplication will not be possible for shadow topics.
> > > Please clarify on the PIP
> > >
> >
> > Yes, this is decided by the nature of shadow topic that it don't have the write access to BK.
> > And I don't see the necessary of supporting GeoReplication for shadow topics.
> > We can make source topic geo-replicated and create the same shadow topic in each clusters.
> >
> > > I believe that this feature is very powerful, but we must design it
> > > carefully and discuss
> > > about all the edge cases. Otherwise we will end up in something that
> > > is half-baked
> > > and we will have to resolve edge cases while developing or after going
> > > to production.
> > >
> > > Every feature must be fully integrated with the rest of Pulsar
> > >
> > > Enrico
> > >
> > > Il giorno mer 29 giu 2022 alle ore 08:40 Haiting Jiang
> > > <ji...@apache.org> ha scritto:
> > > >
> > > > Hi Penghui
> > > >
> > > > On 2022/06/29 04:07:35 PengHui Li wrote:
> > > > > Hi Haiting,
> > > > >
> > > > > Thanks for the explanation. I'm clear for now.
> > > > >
> > > > > Pulsar functions also can do such things by connecting data from one topic
> > > > > to another topic.
> > > > > But the difference is this proposal only copies the data to the cache of
> > > > > another topic, and the data not
> > > > > in the cache is also available by reading from ledgers.
> > > > >
> > > > > And this approach also follows benefits compared with replicating data to
> > > > > multiple "real" topics.
> > > > >
> > > > > - reuse the topic metadata
> > > > > - the same message ID which easy for troubleshooting
> > > > >
> > > > > Just one question
> > > > >
> > > > > >>>>>>>
> > > > > ```
> > > > > message CommandSend { // ... // message id for shadow topic optional
> > > > >    MessageIdData shadow_message_id = 9; }
> > > > > ```
> > > > >
> > > > > Can we get the message ID from the replicated data to avoid introducing a
> > > > > new command?
> > > > > Or use a marker message to avoid broker-to-broker directly protobuf command
> > > > > interaction.
> > > > >
> > > > Sorry for not wrote it clearly. CommandSend is not a new command. It's exactly the main
> > > > command producer used to send message to broker. The only change is add a new field in it.
> > > > The whole command proto would be like this:
> > > > ```
> > > > message CommandSend {
> > > >     required uint64 producer_id = 1;
> > > >     required uint64 sequence_id = 2;
> > > >     optional int32 num_messages = 3 [default = 1];
> > > >     optional uint64 txnid_least_bits = 4 [default = 0];
> > > >     optional uint64 txnid_most_bits = 5 [default = 0];
> > > >
> > > >     /// Add highest sequence id to support batch message with external sequence id
> > > >     optional uint64 highest_sequence_id = 6 [default = 0];
> > > >     optional bool is_chunk     =7 [default = false];
> > > >
> > > >     // Specify if the message being published is a Pulsar marker or not
> > > >     optional bool marker = 8 [default = false];
> > > >
> > > >     // message id for shadow topic
> > > >     optional MessageIdData shadow_message_id = 9;
> > > > }
> > > > ```
> > > > So there won't be any broker-to-broker directly protobuf command interactions.
> > > >
> > > > Thanks,
> > > > Haiting
> > > >
> > > > > Thanks,
> > > > > Penghui
> > > > >
> > > > > On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <ji...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi Penghui & Asaf:
> > > > > >
> > > > > > Please allow me to provide some more detailes about **metadata**
> > > > > > synchronization
> > > > > > between source topic and shadow topic.
> > > > > >
> > > > > > 1.When shadow topic initializes, it will read from metadata store path
> > > > > > "/managed-ledgers/{source_topic_ledger_name}", which contains all the
> > > > > > managed ledger info. We don't
> > > > > > need to read the  ledger information from source topic broker.
> > > > > >
> > > > > > 2. When shadow topic received new message from replicator, if the ledger
> > > > > > id of the message
> > > > > >  is the same as the last ledger, it just updates the LAC. If not, it will
> > > > > > update ledger list from metadata,
> > > > > > and then open the new ledger handle and update the LAC.
> > > > > >
> > > > > > As for the copy itself and add shadow message id in CommandSend, it mostly
> > > > > > serves the purpose
> > > > > > of filling the EntryCache.
> > > > > >
> > > > > > Thanks,
> > > > > > Haiting
> > > > > >
> > > > > > On 2022/06/23 02:08:46 PengHui Li wrote:
> > > > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > > information
> > > > > > > from original topic, without copy?
> > > > > > >
> > > > > > > I think this is a good idea.
> > > > > > >
> > > > > > > Penghui
> > > > > > > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> > > > > > > >
> > > > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > > information
> > > > > > > > from original topic, without copy?
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> > BR,
> > Haiting
> >

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@apache.org>.
Hi Enrico,

Any further suggestion on this PIP?
If not, I would like to raise a  revote on this in a few days.

Thanks,
Haiting

On 2022/07/07 11:30:59 Haiting Jiang wrote:
> Hi Enrico,
> 
> Thanks for your feedback.
> 
> On 2022/07/05 08:03:43 Enrico Olivelli wrote:
> > I have a couple of additional questions.
> > 
> > 1. Security
> > What about security permissions about the shadow topic ?
> > We are reading from another topic.
> > I think we must clarify the decisions in the PIP
> 
> As shadow topic is usually in another namespace, it would have its own
> independent permission settings, and we can configure different permissions
> for source topic and shadow topic. So there would be no guarantee that you are
> allowed to consume shadow topic if you have permission to consume source
> topic.
> 
> On the other hand, we uses topic policy to store shadow topic settings, so a
> new policy permission item needs be added as PolicyName.SHADOW_TOPIC, and user
> must have PolicyOperation.WRITE to this policy to create/delete shadow topics.
> 
> > 
> > 2. Truncation and deletion
> > What happens when you truncate or delete the source topic ?
> > please add a paragraph on the PIP
> > 
> 
> 1. Truncation, from command `bin/pulsar-admin topics truncate source-topic`.
> For source topic truncation, nothing changes. It still move all cursors to the
> end of the topic and delete all inactive ledgers. 
> As shadow topic will watch `ManagedLedgerInfo` in metadata store, once it
> knows ledgers deleted, all cursors will skip all deleted ledgers.
> 
> 2. Deletion, from command `bin/pulsar-admin topics delete source-topic`.
> Like geo-replication, topic deletion is forbidden if topic have shadow
> replicators, users have to delete shadow topics first. Here is the new admin
> API for managing shadow topics with source topic in
> `org.apache.pulsar.client.admin.Topics` :
> ```
> void createShadowTopic(String sourceTopicName, String shadowTopicName);
> void deleteShadowTopic(String sourceTopicName, String shadowTopicName);
> List<String> admin.topics().getShadowTopics(String sourceTopicName);
> 
> //And their async version methods.
> ```
> And this requires new REST interfaces in admin server, where
> ```
> PATH = "/{tenant}/{namespace}/{topic}/shadowTopics";
> METHOD = POST/DELETE/GET;
> ```
> 
> > 3. Offloaders
> > We are talking about BK metadata, how do Shadow Topics work with
> > Offloaded ledgers ?
> > Please clarify in the PIP
> 
> Offloading a ledger is a kind of writing operation to topic's metadata, so
> shadow topic can't offload ledgers to other long term storage. However, for
> ledgers thats are already offloaded by source topic, it's expected to support
> reading from offload ledgers in shadow topic, just like read from source
> topic.
> 
> The implementation depends on shadow topic watching `ManagedLedgerInfo` in
> metadata store, and if LedgerInfo.offloadContext is updated by source topic
> offloader, shadow topic can get fully information to get a readHandle from
> ledgerOffload. And of course, the pre-condition is the shadow topic must have
> the same offload driver settings.
> 
> > 
> > 4. Changes in the number of partitions
> > the PIP says that the number of partitions must match the source topic.
> > Are we preventing changes to the number of partitions in the source topic ?
> > 
> 
> No, the updates on partition number will be synced to the shadow topic. 
> A source topic or partition will be responsible for the creation and deletion 
> of its corresponding shadow topic partitions.
> 
> > 5. Topic stats
> > We should add information on the source topic and on the shadow topic.
> > Please clarify or draft your intentions in the PIP
> > 
> 
> For topic stats on source topic, as shadow replicator will reuse most of current 
> PersistentReplicator, the ReplicatorStatsImpl also can be applied to shadow replicators.
> And we need to add a new field in `TopicStatsImpl` like geo-replication:
> ```
> Map<String /*shadow topic name*/, ReplicatorStatsImpl> shadowReplication;
> ```
> 
> As for topic stats on shadow topic, previous `TopicStatsImpl` still applies. 
> And I don't see any other stats need to be added at this point.
> 
> 
> > 6. GeoReplication
> > I guess that GeoReplication will not be possible for shadow topics.
> > Please clarify on the PIP
> > 
> 
> Yes, this is decided by the nature of shadow topic that it don't have the write access to BK.
> And I don't see the necessary of supporting GeoReplication for shadow topics.
> We can make source topic geo-replicated and create the same shadow topic in each clusters.
> 
> > I believe that this feature is very powerful, but we must design it
> > carefully and discuss
> > about all the edge cases. Otherwise we will end up in something that
> > is half-baked
> > and we will have to resolve edge cases while developing or after going
> > to production.
> > 
> > Every feature must be fully integrated with the rest of Pulsar
> > 
> > Enrico
> > 
> > Il giorno mer 29 giu 2022 alle ore 08:40 Haiting Jiang
> > <ji...@apache.org> ha scritto:
> > >
> > > Hi Penghui
> > >
> > > On 2022/06/29 04:07:35 PengHui Li wrote:
> > > > Hi Haiting,
> > > >
> > > > Thanks for the explanation. I'm clear for now.
> > > >
> > > > Pulsar functions also can do such things by connecting data from one topic
> > > > to another topic.
> > > > But the difference is this proposal only copies the data to the cache of
> > > > another topic, and the data not
> > > > in the cache is also available by reading from ledgers.
> > > >
> > > > And this approach also follows benefits compared with replicating data to
> > > > multiple "real" topics.
> > > >
> > > > - reuse the topic metadata
> > > > - the same message ID which easy for troubleshooting
> > > >
> > > > Just one question
> > > >
> > > > >>>>>>>
> > > > ```
> > > > message CommandSend { // ... // message id for shadow topic optional
> > > >    MessageIdData shadow_message_id = 9; }
> > > > ```
> > > >
> > > > Can we get the message ID from the replicated data to avoid introducing a
> > > > new command?
> > > > Or use a marker message to avoid broker-to-broker directly protobuf command
> > > > interaction.
> > > >
> > > Sorry for not wrote it clearly. CommandSend is not a new command. It's exactly the main
> > > command producer used to send message to broker. The only change is add a new field in it.
> > > The whole command proto would be like this:
> > > ```
> > > message CommandSend {
> > >     required uint64 producer_id = 1;
> > >     required uint64 sequence_id = 2;
> > >     optional int32 num_messages = 3 [default = 1];
> > >     optional uint64 txnid_least_bits = 4 [default = 0];
> > >     optional uint64 txnid_most_bits = 5 [default = 0];
> > >
> > >     /// Add highest sequence id to support batch message with external sequence id
> > >     optional uint64 highest_sequence_id = 6 [default = 0];
> > >     optional bool is_chunk     =7 [default = false];
> > >
> > >     // Specify if the message being published is a Pulsar marker or not
> > >     optional bool marker = 8 [default = false];
> > >
> > >     // message id for shadow topic
> > >     optional MessageIdData shadow_message_id = 9;
> > > }
> > > ```
> > > So there won't be any broker-to-broker directly protobuf command interactions.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > > Thanks,
> > > > Penghui
> > > >
> > > > On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <ji...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Penghui & Asaf:
> > > > >
> > > > > Please allow me to provide some more detailes about **metadata**
> > > > > synchronization
> > > > > between source topic and shadow topic.
> > > > >
> > > > > 1.When shadow topic initializes, it will read from metadata store path
> > > > > "/managed-ledgers/{source_topic_ledger_name}", which contains all the
> > > > > managed ledger info. We don't
> > > > > need to read the  ledger information from source topic broker.
> > > > >
> > > > > 2. When shadow topic received new message from replicator, if the ledger
> > > > > id of the message
> > > > >  is the same as the last ledger, it just updates the LAC. If not, it will
> > > > > update ledger list from metadata,
> > > > > and then open the new ledger handle and update the LAC.
> > > > >
> > > > > As for the copy itself and add shadow message id in CommandSend, it mostly
> > > > > serves the purpose
> > > > > of filling the EntryCache.
> > > > >
> > > > > Thanks,
> > > > > Haiting
> > > > >
> > > > > On 2022/06/23 02:08:46 PengHui Li wrote:
> > > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > from original topic, without copy?
> > > > > >
> > > > > > I think this is a good idea.
> > > > > >
> > > > > > Penghui
> > > > > > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> > > > > > >
> > > > > > > One question comes to mind here: Why not simply read the ledger
> > > > > information
> > > > > > > from original topic, without copy?
> > > > > >
> > > > >
> > > >
> > 
> 
> BR,
> Haiting
> 

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@apache.org>.
Hi Enrico,

Thanks for your feedback.

On 2022/07/05 08:03:43 Enrico Olivelli wrote:
> I have a couple of additional questions.
> 
> 1. Security
> What about security permissions about the shadow topic ?
> We are reading from another topic.
> I think we must clarify the decisions in the PIP

As shadow topic is usually in another namespace, it would have its own
independent permission settings, and we can configure different permissions
for source topic and shadow topic. So there would be no guarantee that you are
allowed to consume shadow topic if you have permission to consume source
topic.

On the other hand, we uses topic policy to store shadow topic settings, so a
new policy permission item needs be added as PolicyName.SHADOW_TOPIC, and user
must have PolicyOperation.WRITE to this policy to create/delete shadow topics.

> 
> 2. Truncation and deletion
> What happens when you truncate or delete the source topic ?
> please add a paragraph on the PIP
> 

1. Truncation, from command `bin/pulsar-admin topics truncate source-topic`.
For source topic truncation, nothing changes. It still move all cursors to the
end of the topic and delete all inactive ledgers. 
As shadow topic will watch `ManagedLedgerInfo` in metadata store, once it
knows ledgers deleted, all cursors will skip all deleted ledgers.

2. Deletion, from command `bin/pulsar-admin topics delete source-topic`.
Like geo-replication, topic deletion is forbidden if topic have shadow
replicators, users have to delete shadow topics first. Here is the new admin
API for managing shadow topics with source topic in
`org.apache.pulsar.client.admin.Topics` :
```
void createShadowTopic(String sourceTopicName, String shadowTopicName);
void deleteShadowTopic(String sourceTopicName, String shadowTopicName);
List<String> admin.topics().getShadowTopics(String sourceTopicName);

//And their async version methods.
```
And this requires new REST interfaces in admin server, where
```
PATH = "/{tenant}/{namespace}/{topic}/shadowTopics";
METHOD = POST/DELETE/GET;
```

> 3. Offloaders
> We are talking about BK metadata, how do Shadow Topics work with
> Offloaded ledgers ?
> Please clarify in the PIP

Offloading a ledger is a kind of writing operation to topic's metadata, so
shadow topic can't offload ledgers to other long term storage. However, for
ledgers thats are already offloaded by source topic, it's expected to support
reading from offload ledgers in shadow topic, just like read from source
topic.

The implementation depends on shadow topic watching `ManagedLedgerInfo` in
metadata store, and if LedgerInfo.offloadContext is updated by source topic
offloader, shadow topic can get fully information to get a readHandle from
ledgerOffload. And of course, the pre-condition is the shadow topic must have
the same offload driver settings.

> 
> 4. Changes in the number of partitions
> the PIP says that the number of partitions must match the source topic.
> Are we preventing changes to the number of partitions in the source topic ?
> 

No, the updates on partition number will be synced to the shadow topic. 
A source topic or partition will be responsible for the creation and deletion 
of its corresponding shadow topic partitions.

> 5. Topic stats
> We should add information on the source topic and on the shadow topic.
> Please clarify or draft your intentions in the PIP
> 

For topic stats on source topic, as shadow replicator will reuse most of current 
PersistentReplicator, the ReplicatorStatsImpl also can be applied to shadow replicators.
And we need to add a new field in `TopicStatsImpl` like geo-replication:
```
Map<String /*shadow topic name*/, ReplicatorStatsImpl> shadowReplication;
```

As for topic stats on shadow topic, previous `TopicStatsImpl` still applies. 
And I don't see any other stats need to be added at this point.


> 6. GeoReplication
> I guess that GeoReplication will not be possible for shadow topics.
> Please clarify on the PIP
> 

Yes, this is decided by the nature of shadow topic that it don't have the write access to BK.
And I don't see the necessary of supporting GeoReplication for shadow topics.
We can make source topic geo-replicated and create the same shadow topic in each clusters.

> I believe that this feature is very powerful, but we must design it
> carefully and discuss
> about all the edge cases. Otherwise we will end up in something that
> is half-baked
> and we will have to resolve edge cases while developing or after going
> to production.
> 
> Every feature must be fully integrated with the rest of Pulsar
> 
> Enrico
> 
> Il giorno mer 29 giu 2022 alle ore 08:40 Haiting Jiang
> <ji...@apache.org> ha scritto:
> >
> > Hi Penghui
> >
> > On 2022/06/29 04:07:35 PengHui Li wrote:
> > > Hi Haiting,
> > >
> > > Thanks for the explanation. I'm clear for now.
> > >
> > > Pulsar functions also can do such things by connecting data from one topic
> > > to another topic.
> > > But the difference is this proposal only copies the data to the cache of
> > > another topic, and the data not
> > > in the cache is also available by reading from ledgers.
> > >
> > > And this approach also follows benefits compared with replicating data to
> > > multiple "real" topics.
> > >
> > > - reuse the topic metadata
> > > - the same message ID which easy for troubleshooting
> > >
> > > Just one question
> > >
> > > >>>>>>>
> > > ```
> > > message CommandSend { // ... // message id for shadow topic optional
> > >    MessageIdData shadow_message_id = 9; }
> > > ```
> > >
> > > Can we get the message ID from the replicated data to avoid introducing a
> > > new command?
> > > Or use a marker message to avoid broker-to-broker directly protobuf command
> > > interaction.
> > >
> > Sorry for not wrote it clearly. CommandSend is not a new command. It's exactly the main
> > command producer used to send message to broker. The only change is add a new field in it.
> > The whole command proto would be like this:
> > ```
> > message CommandSend {
> >     required uint64 producer_id = 1;
> >     required uint64 sequence_id = 2;
> >     optional int32 num_messages = 3 [default = 1];
> >     optional uint64 txnid_least_bits = 4 [default = 0];
> >     optional uint64 txnid_most_bits = 5 [default = 0];
> >
> >     /// Add highest sequence id to support batch message with external sequence id
> >     optional uint64 highest_sequence_id = 6 [default = 0];
> >     optional bool is_chunk     =7 [default = false];
> >
> >     // Specify if the message being published is a Pulsar marker or not
> >     optional bool marker = 8 [default = false];
> >
> >     // message id for shadow topic
> >     optional MessageIdData shadow_message_id = 9;
> > }
> > ```
> > So there won't be any broker-to-broker directly protobuf command interactions.
> >
> > Thanks,
> > Haiting
> >
> > > Thanks,
> > > Penghui
> > >
> > > On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <ji...@apache.org>
> > > wrote:
> > >
> > > > Hi Penghui & Asaf:
> > > >
> > > > Please allow me to provide some more detailes about **metadata**
> > > > synchronization
> > > > between source topic and shadow topic.
> > > >
> > > > 1.When shadow topic initializes, it will read from metadata store path
> > > > "/managed-ledgers/{source_topic_ledger_name}", which contains all the
> > > > managed ledger info. We don't
> > > > need to read the  ledger information from source topic broker.
> > > >
> > > > 2. When shadow topic received new message from replicator, if the ledger
> > > > id of the message
> > > >  is the same as the last ledger, it just updates the LAC. If not, it will
> > > > update ledger list from metadata,
> > > > and then open the new ledger handle and update the LAC.
> > > >
> > > > As for the copy itself and add shadow message id in CommandSend, it mostly
> > > > serves the purpose
> > > > of filling the EntryCache.
> > > >
> > > > Thanks,
> > > > Haiting
> > > >
> > > > On 2022/06/23 02:08:46 PengHui Li wrote:
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > information
> > > > > from original topic, without copy?
> > > > >
> > > > > I think this is a good idea.
> > > > >
> > > > > Penghui
> > > > > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> > > > > >
> > > > > > One question comes to mind here: Why not simply read the ledger
> > > > information
> > > > > > from original topic, without copy?
> > > > >
> > > >
> > >
> 

BR,
Haiting

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Enrico Olivelli <eo...@gmail.com>.
I have a couple of additional questions.

1. Security
What about security permissions about the shadow topic ?
We are reading from another topic.
I think we must clarify the decisions in the PIP

2. Truncation and deletion
What happens when you truncate or delete the source topic ?
please add a paragraph on the PIP

3. Offloaders
We are talking about BK metadata, how do Shadow Topics work with
Offloaded ledgers ?
Please clarify in the PIP

4. Changes in the number of partitions
the PIP says that the number of partitions must match the source topic.
Are we preventing changes to the number of partitions in the source topic ?

5. Topic stats
We should add information on the source topic and on the shadow topic.
Please clarify or draft your intentions in the PIP

6. GeoReplication
I guess that GeoReplication will not be possible for shadow topics.
Please clarify on the PIP

I believe that this feature is very powerful, but we must design it
carefully and discuss
about all the edge cases. Otherwise we will end up in something that
is half-baked
and we will have to resolve edge cases while developing or after going
to production.

Every feature must be fully integrated with the rest of Pulsar

Enrico

Il giorno mer 29 giu 2022 alle ore 08:40 Haiting Jiang
<ji...@apache.org> ha scritto:
>
> Hi Penghui
>
> On 2022/06/29 04:07:35 PengHui Li wrote:
> > Hi Haiting,
> >
> > Thanks for the explanation. I'm clear for now.
> >
> > Pulsar functions also can do such things by connecting data from one topic
> > to another topic.
> > But the difference is this proposal only copies the data to the cache of
> > another topic, and the data not
> > in the cache is also available by reading from ledgers.
> >
> > And this approach also follows benefits compared with replicating data to
> > multiple "real" topics.
> >
> > - reuse the topic metadata
> > - the same message ID which easy for troubleshooting
> >
> > Just one question
> >
> > >>>>>>>
> > ```
> > message CommandSend { // ... // message id for shadow topic optional
> >    MessageIdData shadow_message_id = 9; }
> > ```
> >
> > Can we get the message ID from the replicated data to avoid introducing a
> > new command?
> > Or use a marker message to avoid broker-to-broker directly protobuf command
> > interaction.
> >
> Sorry for not wrote it clearly. CommandSend is not a new command. It's exactly the main
> command producer used to send message to broker. The only change is add a new field in it.
> The whole command proto would be like this:
> ```
> message CommandSend {
>     required uint64 producer_id = 1;
>     required uint64 sequence_id = 2;
>     optional int32 num_messages = 3 [default = 1];
>     optional uint64 txnid_least_bits = 4 [default = 0];
>     optional uint64 txnid_most_bits = 5 [default = 0];
>
>     /// Add highest sequence id to support batch message with external sequence id
>     optional uint64 highest_sequence_id = 6 [default = 0];
>     optional bool is_chunk     =7 [default = false];
>
>     // Specify if the message being published is a Pulsar marker or not
>     optional bool marker = 8 [default = false];
>
>     // message id for shadow topic
>     optional MessageIdData shadow_message_id = 9;
> }
> ```
> So there won't be any broker-to-broker directly protobuf command interactions.
>
> Thanks,
> Haiting
>
> > Thanks,
> > Penghui
> >
> > On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <ji...@apache.org>
> > wrote:
> >
> > > Hi Penghui & Asaf:
> > >
> > > Please allow me to provide some more detailes about **metadata**
> > > synchronization
> > > between source topic and shadow topic.
> > >
> > > 1.When shadow topic initializes, it will read from metadata store path
> > > "/managed-ledgers/{source_topic_ledger_name}", which contains all the
> > > managed ledger info. We don't
> > > need to read the  ledger information from source topic broker.
> > >
> > > 2. When shadow topic received new message from replicator, if the ledger
> > > id of the message
> > >  is the same as the last ledger, it just updates the LAC. If not, it will
> > > update ledger list from metadata,
> > > and then open the new ledger handle and update the LAC.
> > >
> > > As for the copy itself and add shadow message id in CommandSend, it mostly
> > > serves the purpose
> > > of filling the EntryCache.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On 2022/06/23 02:08:46 PengHui Li wrote:
> > > > > One question comes to mind here: Why not simply read the ledger
> > > information
> > > > from original topic, without copy?
> > > >
> > > > I think this is a good idea.
> > > >
> > > > Penghui
> > > > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> > > > >
> > > > > One question comes to mind here: Why not simply read the ledger
> > > information
> > > > > from original topic, without copy?
> > > >
> > >
> >

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@apache.org>.
Hi Penghui

On 2022/06/29 04:07:35 PengHui Li wrote:
> Hi Haiting,
> 
> Thanks for the explanation. I'm clear for now.
> 
> Pulsar functions also can do such things by connecting data from one topic
> to another topic.
> But the difference is this proposal only copies the data to the cache of
> another topic, and the data not
> in the cache is also available by reading from ledgers.
> 
> And this approach also follows benefits compared with replicating data to
> multiple "real" topics.
> 
> - reuse the topic metadata
> - the same message ID which easy for troubleshooting
> 
> Just one question
> 
> >>>>>>>
> ```
> message CommandSend { // ... // message id for shadow topic optional
>    MessageIdData shadow_message_id = 9; }
> ```
> 
> Can we get the message ID from the replicated data to avoid introducing a
> new command?
> Or use a marker message to avoid broker-to-broker directly protobuf command
> interaction.
> 
Sorry for not wrote it clearly. CommandSend is not a new command. It's exactly the main 
command producer used to send message to broker. The only change is add a new field in it. 
The whole command proto would be like this:
```
message CommandSend {
    required uint64 producer_id = 1;
    required uint64 sequence_id = 2;
    optional int32 num_messages = 3 [default = 1];
    optional uint64 txnid_least_bits = 4 [default = 0];
    optional uint64 txnid_most_bits = 5 [default = 0];

    /// Add highest sequence id to support batch message with external sequence id
    optional uint64 highest_sequence_id = 6 [default = 0];
    optional bool is_chunk     =7 [default = false];

    // Specify if the message being published is a Pulsar marker or not
    optional bool marker = 8 [default = false];
    
    // message id for shadow topic
    optional MessageIdData shadow_message_id = 9;
}
```
So there won't be any broker-to-broker directly protobuf command interactions.

Thanks,
Haiting

> Thanks,
> Penghui
> 
> On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <ji...@apache.org>
> wrote:
> 
> > Hi Penghui & Asaf:
> >
> > Please allow me to provide some more detailes about **metadata**
> > synchronization
> > between source topic and shadow topic.
> >
> > 1.When shadow topic initializes, it will read from metadata store path
> > "/managed-ledgers/{source_topic_ledger_name}", which contains all the
> > managed ledger info. We don't
> > need to read the  ledger information from source topic broker.
> >
> > 2. When shadow topic received new message from replicator, if the ledger
> > id of the message
> >  is the same as the last ledger, it just updates the LAC. If not, it will
> > update ledger list from metadata,
> > and then open the new ledger handle and update the LAC.
> >
> > As for the copy itself and add shadow message id in CommandSend, it mostly
> > serves the purpose
> > of filling the EntryCache.
> >
> > Thanks,
> > Haiting
> >
> > On 2022/06/23 02:08:46 PengHui Li wrote:
> > > > One question comes to mind here: Why not simply read the ledger
> > information
> > > from original topic, without copy?
> > >
> > > I think this is a good idea.
> > >
> > > Penghui
> > > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> > > >
> > > > One question comes to mind here: Why not simply read the ledger
> > information
> > > > from original topic, without copy?
> > >
> >
> 

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by PengHui Li <pe...@apache.org>.
Hi Haiting,

Thanks for the explanation. I'm clear for now.

Pulsar functions also can do such things by connecting data from one topic
to another topic.
But the difference is this proposal only copies the data to the cache of
another topic, and the data not
in the cache is also available by reading from ledgers.

And this approach also follows benefits compared with replicating data to
multiple "real" topics.

- reuse the topic metadata
- the same message ID which easy for troubleshooting

Just one question

>>>>>>>
```
message CommandSend { // ... // message id for shadow topic optional
   MessageIdData shadow_message_id = 9; }
```

Can we get the message ID from the replicated data to avoid introducing a
new command?
Or use a marker message to avoid broker-to-broker directly protobuf command
interaction.

Thanks,
Penghui

On Wed, Jun 29, 2022 at 10:31 AM Haiting Jiang <ji...@apache.org>
wrote:

> Hi Penghui & Asaf:
>
> Please allow me to provide some more detailes about **metadata**
> synchronization
> between source topic and shadow topic.
>
> 1.When shadow topic initializes, it will read from metadata store path
> "/managed-ledgers/{source_topic_ledger_name}", which contains all the
> managed ledger info. We don't
> need to read the  ledger information from source topic broker.
>
> 2. When shadow topic received new message from replicator, if the ledger
> id of the message
>  is the same as the last ledger, it just updates the LAC. If not, it will
> update ledger list from metadata,
> and then open the new ledger handle and update the LAC.
>
> As for the copy itself and add shadow message id in CommandSend, it mostly
> serves the purpose
> of filling the EntryCache.
>
> Thanks,
> Haiting
>
> On 2022/06/23 02:08:46 PengHui Li wrote:
> > > One question comes to mind here: Why not simply read the ledger
> information
> > from original topic, without copy?
> >
> > I think this is a good idea.
> >
> > Penghui
> > On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> > >
> > > One question comes to mind here: Why not simply read the ledger
> information
> > > from original topic, without copy?
> >
>

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@apache.org>.
Hi Penghui & Asaf:

Please allow me to provide some more detailes about **metadata** synchronization 
between source topic and shadow topic.

1.When shadow topic initializes, it will read from metadata store path "/managed-ledgers/{source_topic_ledger_name}", which contains all the managed ledger info. We don't 
need to read the  ledger information from source topic broker.

2. When shadow topic received new message from replicator, if the ledger id of the message
 is the same as the last ledger, it just updates the LAC. If not, it will update ledger list from metadata, 
and then open the new ledger handle and update the LAC.

As for the copy itself and add shadow message id in CommandSend, it mostly serves the purpose 
of filling the EntryCache.

Thanks, 
Haiting

On 2022/06/23 02:08:46 PengHui Li wrote:
> > One question comes to mind here: Why not simply read the ledger information
> from original topic, without copy?
> 
> I think this is a good idea.
> 
> Penghui
> On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
> >
> > One question comes to mind here: Why not simply read the ledger information
> > from original topic, without copy?
> 

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by PengHui Li <co...@gmail.com>.
> One question comes to mind here: Why not simply read the ledger information
from original topic, without copy?

I think this is a good idea.

Penghui
On Jun 22, 2022, 23:57 +0800, dev@pulsar.apache.org, wrote:
>
> One question comes to mind here: Why not simply read the ledger information
> from original topic, without copy?

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Asaf Mesika <as...@gmail.com>.
Hi,

I did a quick reading and I couldn't understand the gist of this change:
The shadow topic doesn't really have it's own messages, or it's own ledgers
right? When it reads messages, it reads from the original topic ledgers. So
the only thing you need to do is sync the "metadata" - ledgers list?
One question comes to mind here: Why not simply read the ledger information
from original topic, without copy?

Another question - I couldn't understand why you need to change the
protocol to introduce shadow message id. Can you please explain that to me?
Is CommandSend used only internally between Pulsar Clusters or used by a
Pulsar Client?

Thanks,

Asaf

On Tue, Jun 21, 2022 at 11:00 AM Haiting Jiang <ji...@apache.org>
wrote:

> Hi Pulsar community:
>
> I open a pip to discuss "Shadow Topic, an alternative way to support
> readonly topic ownership."
>
> Proposal Link: https://github.com/apache/pulsar/issues/16153
>
> ---
>
> ## Motivation
>
> The motivation is the same as PIP-63[1], with a new broadcast use case of
> supporting 100K subscriptions in a single topic.
> 1. The bandwidth of a broker limits the number of subscriptions for a
> single
>    topic.
> 2. Subscriptions are competing for the network bandwidth on brokers.
> Different
>    subscriptions might have different levels of severity.
> 3. When synchronizing cross-city message reading, cross-city access needs
> to
>    be minimized.
> 4. [New] Broadcast with 100K subscriptions. There is a limitation of the
>    subscription number of a single topic. It's tested by Hongjie from NTT
> Lab
>    that with 40K subscriptions in a single topic, the client needs about
> 20min
>    to start all client connections, and under 1 msg/s message producer
> rate,
>    the average end to end latency is about 2.9s. And for 100K
> subscriptions,
>    the time of start connection and E2E latency is beyond consideration.
>
> However, it's too complicated to implement with original PIP-63 proposal,
> the
> changed code is already over 3K+ lines, see PR#11960[2], and there are
> still
> some problems left,
> 1. The LAC in readonly topic is updated in a polling pattern, which
> increases
>    the bookie load bookie.
> 2. The message data of readonly topic won't be cached in broker. Increase
> the
>    network usage between broker and bookie when there are more than one
>    subscriber is tail-reading.
> 3. All the subscriptions is managed in original writable-topic, so the
> support
>    max subscription number is not scaleable.
>
> This PIP tries to come up with a simpler solution to support readonly topic
> ownership and solve the problems the previous PR left. The main idea of
> this
> solution is to reuse the feature of geo-replication, but instead of
> duplicating storage, it shares underlying bookie ledgers between different
> topics.
>
> ## Goal
>
> The goal is to introduce **Shadow Topic** as a new type of topic to support
> readonly topic ownership. Just as its name implies, a shadow topic is the
> shadow of some normal persistent topic (let's call it source topic here).
> The
> source topic and the shadow topic must have the same number of partitions
> or
> both non-partitioned. Multiply shadow topics can be created from a source
> topic.
>
> Shadow topic shares the underlying bookie ledgers from its source topic.
> User
> can't produce any messages to shadow topic directly and shadow topic don't
> create any new ledger for messages, all messages in shadow topic come from
> source topic.
>
> Shadow topic have its own subscriptions and don't share with its source
> topic.
> This means the shadow topic have its own cursor ledger to store persistent
> mark-delete info for each persistent subscriptions.
>
> The message sync procedure of shadow topic is supported by shadow
> replication,
> which is very like geo-replication, with these difference:
> 1. Geo-replication only works between topic with the same name in different
>    broker clusters. But shadow topic have no naming limitation and they
> can be
>    in the same cluster.
> 2. Geo-replication duplicates data storage, but shadow topic don't.
> 3. Geo-replication replicates data from each other, it's bidirectional, but
>    shadow replication only have one way data flow.
>
>
> ## API Changes
>
> 1. PulsarApi.proto.
>
> Shadow topic need to know the original message id of the replicated
> messages,
> in order to update new ledger and lac. So we need add a
> `shadow_message_id` in
> CommandSend for replicator.
>
> ```
> message CommandSend { // ... // message id for shadow topic optional
>    MessageIdData shadow_message_id = 9; }
> ```
>
> 2. Admin API for creating shadow topic with source topic
> ```
>    admin.topics().createShadowTopic(source-topic-name, shadow-topic-name)
> ```
>
> ## Implementation
>
> A picture showing key components relations is added in github issue [3].
>
> There are two key changes for implementation.
> 1. How to replicate messages to shadow topics.
> 2. How shadow topic manage shared ledgers info.
>
> ### 1. How to replicate messages to shadow topics.
>
> This part is mostly implemented by `ShadowReplicator`, which extends
> `PersistentReplicator` introduced in geo-replication. The shadow topic list
> is added as a new topic policy of the source topic. Source topic manage the
> lifecycle of all the replicators. The key is to add `shadow_message_id`
> when
> produce message to shadow topics.
>
> ### 2. How shadow topic manage shared ledgers info.
>
> This part is mostly implemented by `ShadowManagedLedger`, which extends
> current `ManagedLedgerImpl` with two key override methods.
>
> 1. `initialize(..)`
> a. Fetch ManagedLedgerInfo of source topic instead of current shadow topic.
>    The source topic name is stored in the topic policy of the shadow topic.
> b. Open the last ledger and read the explicit LAC from bookie, instead of
>    creating new ledger. Reading LAC here requires that the source topic
> must
>    enable explicit LAC feature by set
> `bookkeeperExplicitLacIntervalInMills`
>    to non-zero value in broker.conf.
> c. Do not start checkLedgerRollTask, which tries roll over ledger
> periodically
>
> 2. `internalAsyncAddEntry()` Instead of write entry data to bookie, It only
>    update metadata of ledgers, like `currentLedger`, `lastConfirmedEntry`
> and
>    put the replicated message into `EntryCache`.
>
> Besides, some other problems need to be taken care of.
> - Any ledger metadata updates need to be synced to shadow topic, including
>   ledger offloading or ledger deletion. Shadow topic needs to watch the
> ledger
>   info updates with metadata store and update in time.
> - The local cached LAC of `LedgerHandle` won't updated in time, so we need
>   refresh LAC when a managed cursor requests entries beyond known LAC.
>
> ## Reject Alternatives
>
> See PIP-63[1].
>
> ## Reference
> [1]
> https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> [2] https://github.com/apache/pulsar/pull/11960
> [3] https://github.com/apache/pulsar/issues/16153
>
>
> BR,
> Haiting Jiang
>

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Haiting Jiang <ji...@apache.org>.
Hi Dave,

On 2022/06/23 03:59:35 Dave Fisher wrote:
> 
> On Jun 21, 2022, at 1:00 AM, Haiting Jiang <ji...@apache.org> wrote:
> > 
> > Hi Pulsar community:
> > 
> > I open a pip to discuss "Shadow Topic, an alternative way to support readonly topic ownership."
> > 
> > Proposal Link: https://github.com/apache/pulsar/issues/16153
> > 
> > ---
> > 
> > ## Motivation
> > 
> > The motivation is the same as PIP-63[1], with a new broadcast use case of
> > supporting 100K subscriptions in a single topic.
> > 1. The bandwidth of a broker limits the number of subscriptions for a single
> >   topic.
> > 2. Subscriptions are competing for the network bandwidth on brokers. Different
> >   subscriptions might have different levels of severity.
> > 3. When synchronizing cross-city message reading, cross-city access needs to
> >   be minimized.
> > 4. [New] Broadcast with 100K subscriptions. There is a limitation of the
> >   subscription number of a single topic. It's tested by Hongjie from NTT Lab
> >   that with 40K subscriptions in a single topic, the client needs about 20min
> >   to start all client connections, and under 1 msg/s message producer rate,
> >   the average end to end latency is about 2.9s. And for 100K subscriptions,
> >   the time of start connection and E2E latency is beyond consideration.
> 
> Have you tested performance of two topics each with 40k subscriptions at the same time in the same cluster?
> 
> I think that might simulate the notion of shadow topics in action and see if much performance is actually gained by this notion of splitting.

I have not tested it yet. But as long as the bottle neck of this use case is not the metadata store, 
from the perspective of current architecture, the number of subscriptions pulsar can support can
be scaled horizontally. 

And also, the subscription limitation of one topic can be optimized, like Penghui did in github PR #16245, #16243,#16241.

> It seems to me that a better approach would be to have multiple local pulsar clusters and balance the subscriptions between those.

With this approach, we have to replicate data storage. This is not tolerable for other use cases (like 1,2,3) when data flow is quite large.
And this is the reason why original PIP-63 dropped it as rejected alternatives, see 
https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support#rejected-alternatives

> I’m concerned that this shadow topic approach is adding new complexity to Pulsar without a clear understanding of all of the impacts.
Yes, this is exactly the reason I prefer this new approach rather than split the original PR #11960 just for easier review.
This approach would be much more simpler and less impact on current implementation.  It would be appreciated if you can provide 
some more specific impacts.

Thanks,
Haiting

> Thanks,
> Dave
> 
> > 
> > However, it's too complicated to implement with original PIP-63 proposal, the
> > changed code is already over 3K+ lines, see PR#11960[2], and there are still
> > some problems left,
> > 1. The LAC in readonly topic is updated in a polling pattern, which increases
> >   the bookie load bookie.
> > 2. The message data of readonly topic won't be cached in broker. Increase the
> >   network usage between broker and bookie when there are more than one
> >   subscriber is tail-reading.
> > 3. All the subscriptions is managed in original writable-topic, so the support
> >   max subscription number is not scaleable.
> > 
> > This PIP tries to come up with a simpler solution to support readonly topic
> > ownership and solve the problems the previous PR left. The main idea of this
> > solution is to reuse the feature of geo-replication, but instead of
> > duplicating storage, it shares underlying bookie ledgers between different
> > topics.
> > 
> > ## Goal
> > 
> > The goal is to introduce **Shadow Topic** as a new type of topic to support
> > readonly topic ownership. Just as its name implies, a shadow topic is the
> > shadow of some normal persistent topic (let's call it source topic here). The
> > source topic and the shadow topic must have the same number of partitions or
> > both non-partitioned. Multiply shadow topics can be created from a source
> > topic.
> > 
> > Shadow topic shares the underlying bookie ledgers from its source topic. User
> > can't produce any messages to shadow topic directly and shadow topic don't
> > create any new ledger for messages, all messages in shadow topic come from
> > source topic.
> > 
> > Shadow topic have its own subscriptions and don't share with its source topic.
> > This means the shadow topic have its own cursor ledger to store persistent
> > mark-delete info for each persistent subscriptions.
> > 
> > The message sync procedure of shadow topic is supported by shadow replication,
> > which is very like geo-replication, with these difference:
> > 1. Geo-replication only works between topic with the same name in different
> >   broker clusters. But shadow topic have no naming limitation and they can be
> >   in the same cluster.
> > 2. Geo-replication duplicates data storage, but shadow topic don't.
> > 3. Geo-replication replicates data from each other, it's bidirectional, but
> >   shadow replication only have one way data flow.
> > 
> > 
> > ## API Changes
> > 
> > 1. PulsarApi.proto.
> > 
> > Shadow topic need to know the original message id of the replicated messages,
> > in order to update new ledger and lac. So we need add a `shadow_message_id` in
> > CommandSend for replicator.
> > 
> > ```
> > message CommandSend { // ... // message id for shadow topic optional
> >   MessageIdData shadow_message_id = 9; }
> > ```
> > 
> > 2. Admin API for creating shadow topic with source topic
> > ```
> >   admin.topics().createShadowTopic(source-topic-name, shadow-topic-name)
> > ```
> > 
> > ## Implementation
> > 
> > A picture showing key components relations is added in github issue [3].
> > 
> > There are two key changes for implementation.
> > 1. How to replicate messages to shadow topics.
> > 2. How shadow topic manage shared ledgers info.
> > 
> > ### 1. How to replicate messages to shadow topics. 
> > 
> > This part is mostly implemented by `ShadowReplicator`, which extends
> > `PersistentReplicator` introduced in geo-replication. The shadow topic list
> > is added as a new topic policy of the source topic. Source topic manage the
> > lifecycle of all the replicators. The key is to add `shadow_message_id` when
> > produce message to shadow topics.
> > 
> > ### 2. How shadow topic manage shared ledgers info. 
> > 
> > This part is mostly implemented by `ShadowManagedLedger`, which extends
> > current `ManagedLedgerImpl` with two key override methods.
> > 
> > 1. `initialize(..)`
> > a. Fetch ManagedLedgerInfo of source topic instead of current shadow topic.
> >   The source topic name is stored in the topic policy of the shadow topic.
> > b. Open the last ledger and read the explicit LAC from bookie, instead of
> >   creating new ledger. Reading LAC here requires that the source topic must
> >   enable explicit LAC feature by set `bookkeeperExplicitLacIntervalInMills`
> >   to non-zero value in broker.conf.
> > c. Do not start checkLedgerRollTask, which tries roll over ledger periodically
> > 
> > 2. `internalAsyncAddEntry()` Instead of write entry data to bookie, It only
> >   update metadata of ledgers, like `currentLedger`, `lastConfirmedEntry` and
> >   put the replicated message into `EntryCache`.
> > 
> > Besides, some other problems need to be taken care of.
> > - Any ledger metadata updates need to be synced to shadow topic, including
> >  ledger offloading or ledger deletion. Shadow topic needs to watch the ledger
> >  info updates with metadata store and update in time.
> > - The local cached LAC of `LedgerHandle` won't updated in time, so we need
> >  refresh LAC when a managed cursor requests entries beyond known LAC.
> > 
> > ## Reject Alternatives
> > 
> > See PIP-63[1].
> > 
> > ## Reference 
> > [1] https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> > [2] https://github.com/apache/pulsar/pull/11960 
> > [3] https://github.com/apache/pulsar/issues/16153
> > 
> > 
> > BR,
> > Haiting Jiang
> 
> 

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

Posted by Dave Fisher <wa...@comcast.net>.
On Jun 21, 2022, at 1:00 AM, Haiting Jiang <ji...@apache.org> wrote:
> 
> Hi Pulsar community:
> 
> I open a pip to discuss "Shadow Topic, an alternative way to support readonly topic ownership."
> 
> Proposal Link: https://github.com/apache/pulsar/issues/16153
> 
> ---
> 
> ## Motivation
> 
> The motivation is the same as PIP-63[1], with a new broadcast use case of
> supporting 100K subscriptions in a single topic.
> 1. The bandwidth of a broker limits the number of subscriptions for a single
>   topic.
> 2. Subscriptions are competing for the network bandwidth on brokers. Different
>   subscriptions might have different levels of severity.
> 3. When synchronizing cross-city message reading, cross-city access needs to
>   be minimized.
> 4. [New] Broadcast with 100K subscriptions. There is a limitation of the
>   subscription number of a single topic. It's tested by Hongjie from NTT Lab
>   that with 40K subscriptions in a single topic, the client needs about 20min
>   to start all client connections, and under 1 msg/s message producer rate,
>   the average end to end latency is about 2.9s. And for 100K subscriptions,
>   the time of start connection and E2E latency is beyond consideration.

Have you tested performance of two topics each with 40k subscriptions at the same time in the same cluster?

I think that might simulate the notion of shadow topics in action and see if much performance is actually gained by this notion of splitting.

It seems to me that a better approach would be to have multiple local pulsar clusters and balance the subscriptions between those.

I’m concerned that this shadow topic approach is adding new complexity to Pulsar without a clear understanding of all of the impacts.

Thanks,
Dave

> 
> However, it's too complicated to implement with original PIP-63 proposal, the
> changed code is already over 3K+ lines, see PR#11960[2], and there are still
> some problems left,
> 1. The LAC in readonly topic is updated in a polling pattern, which increases
>   the bookie load bookie.
> 2. The message data of readonly topic won't be cached in broker. Increase the
>   network usage between broker and bookie when there are more than one
>   subscriber is tail-reading.
> 3. All the subscriptions is managed in original writable-topic, so the support
>   max subscription number is not scaleable.
> 
> This PIP tries to come up with a simpler solution to support readonly topic
> ownership and solve the problems the previous PR left. The main idea of this
> solution is to reuse the feature of geo-replication, but instead of
> duplicating storage, it shares underlying bookie ledgers between different
> topics.
> 
> ## Goal
> 
> The goal is to introduce **Shadow Topic** as a new type of topic to support
> readonly topic ownership. Just as its name implies, a shadow topic is the
> shadow of some normal persistent topic (let's call it source topic here). The
> source topic and the shadow topic must have the same number of partitions or
> both non-partitioned. Multiply shadow topics can be created from a source
> topic.
> 
> Shadow topic shares the underlying bookie ledgers from its source topic. User
> can't produce any messages to shadow topic directly and shadow topic don't
> create any new ledger for messages, all messages in shadow topic come from
> source topic.
> 
> Shadow topic have its own subscriptions and don't share with its source topic.
> This means the shadow topic have its own cursor ledger to store persistent
> mark-delete info for each persistent subscriptions.
> 
> The message sync procedure of shadow topic is supported by shadow replication,
> which is very like geo-replication, with these difference:
> 1. Geo-replication only works between topic with the same name in different
>   broker clusters. But shadow topic have no naming limitation and they can be
>   in the same cluster.
> 2. Geo-replication duplicates data storage, but shadow topic don't.
> 3. Geo-replication replicates data from each other, it's bidirectional, but
>   shadow replication only have one way data flow.
> 
> 
> ## API Changes
> 
> 1. PulsarApi.proto.
> 
> Shadow topic need to know the original message id of the replicated messages,
> in order to update new ledger and lac. So we need add a `shadow_message_id` in
> CommandSend for replicator.
> 
> ```
> message CommandSend { // ... // message id for shadow topic optional
>   MessageIdData shadow_message_id = 9; }
> ```
> 
> 2. Admin API for creating shadow topic with source topic
> ```
>   admin.topics().createShadowTopic(source-topic-name, shadow-topic-name)
> ```
> 
> ## Implementation
> 
> A picture showing key components relations is added in github issue [3].
> 
> There are two key changes for implementation.
> 1. How to replicate messages to shadow topics.
> 2. How shadow topic manage shared ledgers info.
> 
> ### 1. How to replicate messages to shadow topics. 
> 
> This part is mostly implemented by `ShadowReplicator`, which extends
> `PersistentReplicator` introduced in geo-replication. The shadow topic list
> is added as a new topic policy of the source topic. Source topic manage the
> lifecycle of all the replicators. The key is to add `shadow_message_id` when
> produce message to shadow topics.
> 
> ### 2. How shadow topic manage shared ledgers info. 
> 
> This part is mostly implemented by `ShadowManagedLedger`, which extends
> current `ManagedLedgerImpl` with two key override methods.
> 
> 1. `initialize(..)`
> a. Fetch ManagedLedgerInfo of source topic instead of current shadow topic.
>   The source topic name is stored in the topic policy of the shadow topic.
> b. Open the last ledger and read the explicit LAC from bookie, instead of
>   creating new ledger. Reading LAC here requires that the source topic must
>   enable explicit LAC feature by set `bookkeeperExplicitLacIntervalInMills`
>   to non-zero value in broker.conf.
> c. Do not start checkLedgerRollTask, which tries roll over ledger periodically
> 
> 2. `internalAsyncAddEntry()` Instead of write entry data to bookie, It only
>   update metadata of ledgers, like `currentLedger`, `lastConfirmedEntry` and
>   put the replicated message into `EntryCache`.
> 
> Besides, some other problems need to be taken care of.
> - Any ledger metadata updates need to be synced to shadow topic, including
>  ledger offloading or ledger deletion. Shadow topic needs to watch the ledger
>  info updates with metadata store and update in time.
> - The local cached LAC of `LedgerHandle` won't updated in time, so we need
>  refresh LAC when a managed cursor requests entries beyond known LAC.
> 
> ## Reject Alternatives
> 
> See PIP-63[1].
> 
> ## Reference 
> [1] https://github.com/apache/pulsar/wiki/PIP-63%3A-Readonly-Topic-Ownership-Support
> [2] https://github.com/apache/pulsar/pull/11960 
> [3] https://github.com/apache/pulsar/issues/16153
> 
> 
> BR,
> Haiting Jiang