You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Zixuan Liu <no...@gmail.com> on 2022/04/01 09:22:20 UTC

Re: [DISSCUSS] [PIP-150] Support read the message of startMessageId position on the broker side

Thanks, Zike,

> Does this PIP change the original behavior of this consumer?

The client still keeps the old path, I just append a field to the subscribe
command when creating a consumer.

Thanks,
Zixuan

Zike Yang <zi...@apache.org> 于2022年3月31日周四 20:04写道:

> Hi, Zixuan
>
> Thanks for your PIP. I see that you have updated the PIP. Here are my
> thoughts.
>
> > ### Compatibility
> >
> > No breaking changes are introduced here, the users can use any client to
> request any broker.
>
> Do you mean that this is both backward and forward compatibility?
>
> Assume that I have such a consumer below:
>
> ```
> Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
>         .subscriptionName("my-sub").startMessageIdInclusive()
>         .subscribe();
> ```
>
> Does this PIP change the original behavior of this consumer? If it
> does, this may introduce the breaking change here.
>
> It would be better if the compatibility part could be described more
> specifically.
>
> Thanks,
> Zike Yang
>
> On Mon, Mar 28, 2022 at 11:37 AM Zixuan Liu <no...@gmail.com> wrote:
> >
> > Thanks Enrico,
> >
> > Good suggestion.
> >
> > > do you mean that you want to start reading from the latest message?
> >
> > Yes.
> >
> > > I suggest to add more messages to the example, in order to make it
> clearer
> > > (like 1,2,3,4,5)
> >
> > Added.
> >
> >
> > > I am not sure if this will change the semantics of existing programs.
> >
> > Your worry is necessary. I need to know more people's views.
> >
> > > Shall we have to add a new configuration option?
> >
> > I think we should add a new configuration here. Do you mean that you
> start
> > reading from the latest message when using the `latest`? If so, the old
> > behavior will be broken.
> >
> >
> >
> > Enrico Olivelli <eo...@gmail.com> 于2022年3月26日周六 05:42写道:
> >
> > > Il Ven 25 Mar 2022, 17:36 Zixuan Liu <no...@gmail.com> ha scritto:
> > >
> > > > Hi Pulsar community,
> > > >
> > > > I create a PIP for support read the message of `startMessageId`
> position
> > > on
> > > > the broker side.
> > > >
> > > > The proposal can be found:
> https://github.com/apache/pulsar/issues/14883
> > > >
> > > > ---------
> > > > ## Motivation
> > > >
> > > > Currently, the Pulsar-client supports setting the `startMessageId`
> for
> > > > Consumer and Reader, and also supports reading the message of
> > > > `startMessageId` position.
> > > >
> > > > Assume, we have two message id 1 and 2:
> > > >
> > > > - When we set `earliest` as `strartMessageId` value, we can the
> message
> > > of
> > > > message id 1
> > > > - When we set `latest` as as `strartMessageId` value, we cannot get
> any
> > > > message
> > > >
> > > > Sometimes we want to read the message id 2 for the first time,
> > >
> > > do you mean that you want to start reading from the latest message?
> > >
> > > I suggest to add more messages to the example, in order to make it
> clearer
> > > (like 1,2,3,4,5)
> > >
> > >
> > >
> > > we have only
> > > > one approach in client:
> > > >
> > > > ```
> > > >  Reader<byte[]> reader = pulsarClient.newReader()
> > > >                 .topic(topicName)
> > > >                 .subscriptionName(subscriptionName)
> > > >                 .startMessageId(MessageId.latest)
> > > >                 .startMessageIdInclusive()
> > > >                 .create();
> > > >
> > > > reader.hasMessageAvailable();
> > > > Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> > > > ```
> > > >
> > > > Call `reader.hasMessageAvailable()` before `reader.readNext()`,
> which can
> > > > seek to the message id 2, this approach seems confusing.
> > >
> > >
> > > Agreed
> > >
> > >
> > > If we do this on
> > > > the broker side, which will make things easier.
> > > >
> > > > ## Goal
> > > >
> > > > This PIP proposes support read the message of `startMessageId`
> position
> > > on
> > > > the broker side.
> > > >
> > > > ## Implementation
> > > >
> > > > ### Protocol
> > > >
> > > > Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> > > > determine whether to read the message of `startMessageId` position:
> > > >
> > > > ```
> > > > message CommandSubscribe {
> > > >     // some fields
> > > >
> > > >     // If specified, the subscription will read the message from the
> > > start
> > > > message id position.
> > > >     optional bool start_message_id_inclusive = 20 [default = false];
> > > > }
> > > > ```
> > > >
> > > > ### ManagedCursorImpl
> > > >
> > > > Add a check in
> > > >
> > > >
> > >
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> > > >
> > > >
> > > > We only need to care that the `startMessageId` is `MessageId.latest`
> and
> > > > the`start_message_id_inclusive` is `true`, we get latest position
> from
> > > > ledger as `readPosition` value, otherwise if
> > > > the`start_message_id_inclusive` is `false`,  get next position of the
> > > > latest position as `readPosition` value.
> > > >
> > >
> > > I am not sure if this will change the semantics of existing programs.
> > >
> > > Shall we have to add a new configuration option?
> > >
> > > Otherwise programs written the way you are describing won't work
> anymore
> > >
> > > Enrico
> > >
> > >
> > >
> > >
> > > > ---------
> > > >
> > > > Thanks,
> > > > Zixuan
> > > >
> > >
>