You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Zixuan Liu <no...@gmail.com> on 2022/04/06 08:54:05 UTC

[VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Hi Pulsar community,

Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883

Thanks,
Zixuan

-----

Discussion thread:
https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl

## Motivation

Currently, the Pulsar-client supports setting the `startMessageId` for
Consumer and Reader, and also supports reading the message of
`startMessageId` position.

Assume, we have two message id 1,2,3,4 in the topic:

- When we set `earliest` as `startMessageId` value, we can get the message
of message id 1
- When we set `latest` as `startMessageId` value, we can't get any message

 Sometimes we want to read the message id 4 for the first time, we have
only one approach in client:

```
 Reader<byte[]> reader = pulsarClient.newReader()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .startMessageId(MessageId.latest)
                .startMessageIdInclusive()
                .create();

reader.hasMessageAvailable();
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
```

Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
correct message id 4, which include seek action when the
`startMessageIdInclusive()` is enabled.

This approach is confusing.   If we do this on the broker side, it will
make things easier.

## Goal

This PIP proposes support for reading the message of `startMessageId`
position on the broker side:

- Add to `Consumer`
- Add to `Reader`

## Implementation

### Protocol

Add a `start_message_id_inclusive` field to `CommandSubscribe` for
determine whether to read the message of `startMessageId` position:

```
message CommandSubscribe {
    // some fields

    // If specified, the subscription will read the message from the start
message id position.
    optional bool start_message_id_inclusive = 20 [default = false];
}
```

### ManagedCursorImpl

Add a check in
`org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.


We only need to care that the `startMessageId` is `MessageId.latest` and
the`start_message_id_inclusive` is `true`, we get latest position from
ledger as `readPosition` value, otherwise if
the`start_message_id_inclusive` is `false`,  get next position of the
latest position as `readPosition` value.

### Client

The `Consumer` and `Reader` support setting the
`start_message_id_inclusive` value to `CommandSubscribe` command.

### Compatibility

This feature can have both backward and forward compatibility, this means
the users can use any client to request any broker.

Notice that the users still can read the message of the latest position by
call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
call can be ignored when using the new client and the new broker.

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by Zike Yang <zi...@apache.org>.
+1 (non-binding)

Thanks,
Zike

On Wed, Apr 27, 2022 at 10:44 AM Lin Lin <li...@apache.org> wrote:
>
> +1
>
> Lin Lin

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by Lin Lin <li...@apache.org>.
+1

Lin Lin

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by mattison chao <ma...@gmail.com>.
+1(unbinding)

Thanks,
Mattison

On Mon, 11 Apr 2022 at 13:06, Haiting Jiang <ji...@apache.org> wrote:

> +1
>
> Thanks,
> Haiting
>
> On 2022/04/06 08:54:05 Zixuan Liu wrote:
> > Hi Pulsar community,
> >
> > Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> >
> > Thanks,
> > Zixuan
> >
> > -----
> >
> > Discussion thread:
> > https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> >
> > ## Motivation
> >
> > Currently, the Pulsar-client supports setting the `startMessageId` for
> > Consumer and Reader, and also supports reading the message of
> > `startMessageId` position.
> >
> > Assume, we have two message id 1,2,3,4 in the topic:
> >
> > - When we set `earliest` as `startMessageId` value, we can get the
> message
> > of message id 1
> > - When we set `latest` as `startMessageId` value, we can't get any
> message
> >
> >  Sometimes we want to read the message id 4 for the first time, we have
> > only one approach in client:
> >
> > ```
> >  Reader<byte[]> reader = pulsarClient.newReader()
> >                 .topic(topicName)
> >                 .subscriptionName(subscriptionName)
> >                 .startMessageId(MessageId.latest)
> >                 .startMessageIdInclusive()
> >                 .create();
> >
> > reader.hasMessageAvailable();
> > Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> > ```
> >
> > Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> the
> > correct message id 4, which include seek action when the
> > `startMessageIdInclusive()` is enabled.
> >
> > This approach is confusing.   If we do this on the broker side, it will
> > make things easier.
> >
> > ## Goal
> >
> > This PIP proposes support for reading the message of `startMessageId`
> > position on the broker side:
> >
> > - Add to `Consumer`
> > - Add to `Reader`
> >
> > ## Implementation
> >
> > ### Protocol
> >
> > Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> > determine whether to read the message of `startMessageId` position:
> >
> > ```
> > message CommandSubscribe {
> >     // some fields
> >
> >     // If specified, the subscription will read the message from the
> start
> > message id position.
> >     optional bool start_message_id_inclusive = 20 [default = false];
> > }
> > ```
> >
> > ### ManagedCursorImpl
> >
> > Add a check in
> >
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> >
> >
> > We only need to care that the `startMessageId` is `MessageId.latest` and
> > the`start_message_id_inclusive` is `true`, we get latest position from
> > ledger as `readPosition` value, otherwise if
> > the`start_message_id_inclusive` is `false`,  get next position of the
> > latest position as `readPosition` value.
> >
> > ### Client
> >
> > The `Consumer` and `Reader` support setting the
> > `start_message_id_inclusive` value to `CommandSubscribe` command.
> >
> > ### Compatibility
> >
> > This feature can have both backward and forward compatibility, this means
> > the users can use any client to request any broker.
> >
> > Notice that the users still can read the message of the latest position
> by
> > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > call can be ignored when using the new client and the new broker.
> >
>

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by Haiting Jiang <ji...@apache.org>.
+1 

Thanks,
Haiting

On 2022/04/06 08:54:05 Zixuan Liu wrote:
> Hi Pulsar community,
> 
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> 
> Thanks,
> Zixuan
> 
> -----
> 
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> 
> ## Motivation
> 
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
> 
> Assume, we have two message id 1,2,3,4 in the topic:
> 
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
> 
>  Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
> 
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
> 
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
> 
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
> 
> This approach is confusing.   If we do this on the broker side, it will
> make things easier.
> 
> ## Goal
> 
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
> 
> - Add to `Consumer`
> - Add to `Reader`
> 
> ## Implementation
> 
> ### Protocol
> 
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
> 
> ```
> message CommandSubscribe {
>     // some fields
> 
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
> 
> ### ManagedCursorImpl
> 
> Add a check in
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> 
> 
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
> 
> ### Client
> 
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
> 
> ### Compatibility
> 
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
> 
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
> 

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by Zixuan Liu <no...@gmail.com>.
Thanks for your vote! Closed by 3 (+1) binding vote and 0 (-1) vote.

Thanks,
Zixuan

Zixuan Liu <no...@gmail.com> 于2022年4月6日周三 16:54写道:

> Hi Pulsar community,
>
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>
> Thanks,
> Zixuan
>
> -----
>
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1,2,3,4 in the topic:
>
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
>
>  Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
>
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
>
> This approach is confusing.   If we do this on the broker side, it will
> make things easier.
>
> ## Goal
>
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
>
> - Add to `Consumer`
> - Add to `Reader`
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
>     // some fields
>
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
>
> ### Client
>
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
>
> ### Compatibility
>
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
>
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
>
>
>

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by Zixuan Liu <no...@gmail.com>.
For old version that doesn't support `start_message_id_inclusive`, should I
fix the client side?

You mean that call the hasMessageAvailable method in new consumer and
reader?


PengHui Li <pe...@apache.org> 于2022年4月11日周一 09:10写道:

> Hi zixuan,
>
> The proposal looks good,
> regarding the compatibility, should we check the protocol version at the
> client side?
> The old version version doesn't support `start_message_id_inclusive` which
> means the
> client side still needs to do the seek operation while requesting an old
> version broker.
>
> > Notice that the users still can read the message of the latest position
> by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
> For this part, I don't think it should be a notice, We do not deliberately
> modify current behavior,
> just fix the problem if users don't want to call
> reader.hasMessageAvailable() because they know
> topic has at least one message. Add notice here, it looks like after this
> proposal, we are not
> suggesting to use hasMessageAvailable() any more.
>
> Penghui
>
> On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
>
> > Hi Pulsar community,
> >
> > Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> >
> > Thanks,
> > Zixuan
> >
> > -----
> >
> > Discussion thread:
> > https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> >
> > ## Motivation
> >
> > Currently, the Pulsar-client supports setting the `startMessageId` for
> > Consumer and Reader, and also supports reading the message of
> > `startMessageId` position.
> >
> > Assume, we have two message id 1,2,3,4 in the topic:
> >
> > - When we set `earliest` as `startMessageId` value, we can get the
> message
> > of message id 1
> > - When we set `latest` as `startMessageId` value, we can't get any
> message
> >
> >  Sometimes we want to read the message id 4 for the first time, we have
> > only one approach in client:
> >
> > ```
> >  Reader<byte[]> reader = pulsarClient.newReader()
> >                 .topic(topicName)
> >                 .subscriptionName(subscriptionName)
> >                 .startMessageId(MessageId.latest)
> >                 .startMessageIdInclusive()
> >                 .create();
> >
> > reader.hasMessageAvailable();
> > Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> > ```
> >
> > Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> the
> > correct message id 4, which include seek action when the
> > `startMessageIdInclusive()` is enabled.
> >
> > This approach is confusing.   If we do this on the broker side, it will
> > make things easier.
> >
> > ## Goal
> >
> > This PIP proposes support for reading the message of `startMessageId`
> > position on the broker side:
> >
> > - Add to `Consumer`
> > - Add to `Reader`
> >
> > ## Implementation
> >
> > ### Protocol
> >
> > Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> > determine whether to read the message of `startMessageId` position:
> >
> > ```
> > message CommandSubscribe {
> >     // some fields
> >
> >     // If specified, the subscription will read the message from the
> start
> > message id position.
> >     optional bool start_message_id_inclusive = 20 [default = false];
> > }
> > ```
> >
> > ### ManagedCursorImpl
> >
> > Add a check in
> >
> >
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> >
> >
> > We only need to care that the `startMessageId` is `MessageId.latest` and
> > the`start_message_id_inclusive` is `true`, we get latest position from
> > ledger as `readPosition` value, otherwise if
> > the`start_message_id_inclusive` is `false`,  get next position of the
> > latest position as `readPosition` value.
> >
> > ### Client
> >
> > The `Consumer` and `Reader` support setting the
> > `start_message_id_inclusive` value to `CommandSubscribe` command.
> >
> > ### Compatibility
> >
> > This feature can have both backward and forward compatibility, this means
> > the users can use any client to request any broker.
> >
> > Notice that the users still can read the message of the latest position
> by
> > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > call can be ignored when using the new client and the new broker.
> >
>

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by Hang Chen <ch...@apache.org>.
+1

Thanks,
Hang

Zixuan Liu <no...@gmail.com> 于2022年4月11日周一 18:30写道:
>
> This way is right.
>
> PengHui Li <pe...@apache.org> 于2022年4月11日周一 09:12写道:
>
> > > regarding the compatibility, should we check the protocol version at the
> > client side?
> > The old version version doesn't support `start_message_id_inclusive` which
> > means the
> > client side still needs to do the seek operation while requesting an old
> > version broker.
> >
> > Oh sorry, my fault. If the broker side does not support
> > `start_message_id_inclusive`, users
> > need to call `reader.hasMessageAvailable()` first.
> >
> > +1 for the proposal.
> >
> > Penghui
> >
> > On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <pe...@apache.org> wrote:
> >
> > > Hi zixuan,
> > >
> > > The proposal looks good,
> > > regarding the compatibility, should we check the protocol version at the
> > > client side?
> > > The old version version doesn't support `start_message_id_inclusive`
> > which
> > > means the
> > > client side still needs to do the seek operation while requesting an old
> > > version broker.
> > >
> > > > Notice that the users still can read the message of the latest position
> > >  by
> > > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > > call can be ignored when using the new client and the new broker.
> > >
> > > For this part, I don't think it should be a notice, We do not
> > deliberately
> > > modify current behavior,
> > > just fix the problem if users don't want to call
> > > reader.hasMessageAvailable() because they know
> > > topic has at least one message. Add notice here, it looks like after this
> > > proposal, we are not
> > > suggesting to use hasMessageAvailable() any more.
> > >
> > > Penghui
> > >
> > > On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
> > >
> > >> Hi Pulsar community,
> > >>
> > >> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> > >>
> > >> Thanks,
> > >> Zixuan
> > >>
> > >> -----
> > >>
> > >> Discussion thread:
> > >> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> > >>
> > >> ## Motivation
> > >>
> > >> Currently, the Pulsar-client supports setting the `startMessageId` for
> > >> Consumer and Reader, and also supports reading the message of
> > >> `startMessageId` position.
> > >>
> > >> Assume, we have two message id 1,2,3,4 in the topic:
> > >>
> > >> - When we set `earliest` as `startMessageId` value, we can get the
> > message
> > >> of message id 1
> > >> - When we set `latest` as `startMessageId` value, we can't get any
> > message
> > >>
> > >>  Sometimes we want to read the message id 4 for the first time, we have
> > >> only one approach in client:
> > >>
> > >> ```
> > >>  Reader<byte[]> reader = pulsarClient.newReader()
> > >>                 .topic(topicName)
> > >>                 .subscriptionName(subscriptionName)
> > >>                 .startMessageId(MessageId.latest)
> > >>                 .startMessageIdInclusive()
> > >>                 .create();
> > >>
> > >> reader.hasMessageAvailable();
> > >> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> > >> ```
> > >>
> > >> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> > the
> > >> correct message id 4, which include seek action when the
> > >> `startMessageIdInclusive()` is enabled.
> > >>
> > >> This approach is confusing.   If we do this on the broker side, it will
> > >> make things easier.
> > >>
> > >> ## Goal
> > >>
> > >> This PIP proposes support for reading the message of `startMessageId`
> > >> position on the broker side:
> > >>
> > >> - Add to `Consumer`
> > >> - Add to `Reader`
> > >>
> > >> ## Implementation
> > >>
> > >> ### Protocol
> > >>
> > >> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> > >> determine whether to read the message of `startMessageId` position:
> > >>
> > >> ```
> > >> message CommandSubscribe {
> > >>     // some fields
> > >>
> > >>     // If specified, the subscription will read the message from the
> > start
> > >> message id position.
> > >>     optional bool start_message_id_inclusive = 20 [default = false];
> > >> }
> > >> ```
> > >>
> > >> ### ManagedCursorImpl
> > >>
> > >> Add a check in
> > >>
> > >>
> > `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> > >>
> > >>
> > >> We only need to care that the `startMessageId` is `MessageId.latest` and
> > >> the`start_message_id_inclusive` is `true`, we get latest position from
> > >> ledger as `readPosition` value, otherwise if
> > >> the`start_message_id_inclusive` is `false`,  get next position of the
> > >> latest position as `readPosition` value.
> > >>
> > >> ### Client
> > >>
> > >> The `Consumer` and `Reader` support setting the
> > >> `start_message_id_inclusive` value to `CommandSubscribe` command.
> > >>
> > >> ### Compatibility
> > >>
> > >> This feature can have both backward and forward compatibility, this
> > means
> > >> the users can use any client to request any broker.
> > >>
> > >> Notice that the users still can read the message of the latest position
> > by
> > >> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > >> call can be ignored when using the new client and the new broker.
> > >>
> > >
> >

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by Zixuan Liu <no...@gmail.com>.
This way is right.

PengHui Li <pe...@apache.org> 于2022年4月11日周一 09:12写道:

> > regarding the compatibility, should we check the protocol version at the
> client side?
> The old version version doesn't support `start_message_id_inclusive` which
> means the
> client side still needs to do the seek operation while requesting an old
> version broker.
>
> Oh sorry, my fault. If the broker side does not support
> `start_message_id_inclusive`, users
> need to call `reader.hasMessageAvailable()` first.
>
> +1 for the proposal.
>
> Penghui
>
> On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <pe...@apache.org> wrote:
>
> > Hi zixuan,
> >
> > The proposal looks good,
> > regarding the compatibility, should we check the protocol version at the
> > client side?
> > The old version version doesn't support `start_message_id_inclusive`
> which
> > means the
> > client side still needs to do the seek operation while requesting an old
> > version broker.
> >
> > > Notice that the users still can read the message of the latest position
> >  by
> > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > call can be ignored when using the new client and the new broker.
> >
> > For this part, I don't think it should be a notice, We do not
> deliberately
> > modify current behavior,
> > just fix the problem if users don't want to call
> > reader.hasMessageAvailable() because they know
> > topic has at least one message. Add notice here, it looks like after this
> > proposal, we are not
> > suggesting to use hasMessageAvailable() any more.
> >
> > Penghui
> >
> > On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
> >
> >> Hi Pulsar community,
> >>
> >> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> >>
> >> Thanks,
> >> Zixuan
> >>
> >> -----
> >>
> >> Discussion thread:
> >> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> >>
> >> ## Motivation
> >>
> >> Currently, the Pulsar-client supports setting the `startMessageId` for
> >> Consumer and Reader, and also supports reading the message of
> >> `startMessageId` position.
> >>
> >> Assume, we have two message id 1,2,3,4 in the topic:
> >>
> >> - When we set `earliest` as `startMessageId` value, we can get the
> message
> >> of message id 1
> >> - When we set `latest` as `startMessageId` value, we can't get any
> message
> >>
> >>  Sometimes we want to read the message id 4 for the first time, we have
> >> only one approach in client:
> >>
> >> ```
> >>  Reader<byte[]> reader = pulsarClient.newReader()
> >>                 .topic(topicName)
> >>                 .subscriptionName(subscriptionName)
> >>                 .startMessageId(MessageId.latest)
> >>                 .startMessageIdInclusive()
> >>                 .create();
> >>
> >> reader.hasMessageAvailable();
> >> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> >> ```
> >>
> >> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> the
> >> correct message id 4, which include seek action when the
> >> `startMessageIdInclusive()` is enabled.
> >>
> >> This approach is confusing.   If we do this on the broker side, it will
> >> make things easier.
> >>
> >> ## Goal
> >>
> >> This PIP proposes support for reading the message of `startMessageId`
> >> position on the broker side:
> >>
> >> - Add to `Consumer`
> >> - Add to `Reader`
> >>
> >> ## Implementation
> >>
> >> ### Protocol
> >>
> >> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> >> determine whether to read the message of `startMessageId` position:
> >>
> >> ```
> >> message CommandSubscribe {
> >>     // some fields
> >>
> >>     // If specified, the subscription will read the message from the
> start
> >> message id position.
> >>     optional bool start_message_id_inclusive = 20 [default = false];
> >> }
> >> ```
> >>
> >> ### ManagedCursorImpl
> >>
> >> Add a check in
> >>
> >>
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> >>
> >>
> >> We only need to care that the `startMessageId` is `MessageId.latest` and
> >> the`start_message_id_inclusive` is `true`, we get latest position from
> >> ledger as `readPosition` value, otherwise if
> >> the`start_message_id_inclusive` is `false`,  get next position of the
> >> latest position as `readPosition` value.
> >>
> >> ### Client
> >>
> >> The `Consumer` and `Reader` support setting the
> >> `start_message_id_inclusive` value to `CommandSubscribe` command.
> >>
> >> ### Compatibility
> >>
> >> This feature can have both backward and forward compatibility, this
> means
> >> the users can use any client to request any broker.
> >>
> >> Notice that the users still can read the message of the latest position
> by
> >> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> >> call can be ignored when using the new client and the new broker.
> >>
> >
>

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by PengHui Li <pe...@apache.org>.
> regarding the compatibility, should we check the protocol version at the
client side?
The old version version doesn't support `start_message_id_inclusive` which
means the
client side still needs to do the seek operation while requesting an old
version broker.

Oh sorry, my fault. If the broker side does not support
`start_message_id_inclusive`, users
need to call `reader.hasMessageAvailable()` first.

+1 for the proposal.

Penghui

On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <pe...@apache.org> wrote:

> Hi zixuan,
>
> The proposal looks good,
> regarding the compatibility, should we check the protocol version at the
> client side?
> The old version version doesn't support `start_message_id_inclusive` which
> means the
> client side still needs to do the seek operation while requesting an old
> version broker.
>
> > Notice that the users still can read the message of the latest position
>  by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
> For this part, I don't think it should be a notice, We do not deliberately
> modify current behavior,
> just fix the problem if users don't want to call
> reader.hasMessageAvailable() because they know
> topic has at least one message. Add notice here, it looks like after this
> proposal, we are not
> suggesting to use hasMessageAvailable() any more.
>
> Penghui
>
> On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
>
>> Hi Pulsar community,
>>
>> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>>
>> Thanks,
>> Zixuan
>>
>> -----
>>
>> Discussion thread:
>> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>>
>> ## Motivation
>>
>> Currently, the Pulsar-client supports setting the `startMessageId` for
>> Consumer and Reader, and also supports reading the message of
>> `startMessageId` position.
>>
>> Assume, we have two message id 1,2,3,4 in the topic:
>>
>> - When we set `earliest` as `startMessageId` value, we can get the message
>> of message id 1
>> - When we set `latest` as `startMessageId` value, we can't get any message
>>
>>  Sometimes we want to read the message id 4 for the first time, we have
>> only one approach in client:
>>
>> ```
>>  Reader<byte[]> reader = pulsarClient.newReader()
>>                 .topic(topicName)
>>                 .subscriptionName(subscriptionName)
>>                 .startMessageId(MessageId.latest)
>>                 .startMessageIdInclusive()
>>                 .create();
>>
>> reader.hasMessageAvailable();
>> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
>> ```
>>
>> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
>> correct message id 4, which include seek action when the
>> `startMessageIdInclusive()` is enabled.
>>
>> This approach is confusing.   If we do this on the broker side, it will
>> make things easier.
>>
>> ## Goal
>>
>> This PIP proposes support for reading the message of `startMessageId`
>> position on the broker side:
>>
>> - Add to `Consumer`
>> - Add to `Reader`
>>
>> ## Implementation
>>
>> ### Protocol
>>
>> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
>> determine whether to read the message of `startMessageId` position:
>>
>> ```
>> message CommandSubscribe {
>>     // some fields
>>
>>     // If specified, the subscription will read the message from the start
>> message id position.
>>     optional bool start_message_id_inclusive = 20 [default = false];
>> }
>> ```
>>
>> ### ManagedCursorImpl
>>
>> Add a check in
>>
>> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>>
>>
>> We only need to care that the `startMessageId` is `MessageId.latest` and
>> the`start_message_id_inclusive` is `true`, we get latest position from
>> ledger as `readPosition` value, otherwise if
>> the`start_message_id_inclusive` is `false`,  get next position of the
>> latest position as `readPosition` value.
>>
>> ### Client
>>
>> The `Consumer` and `Reader` support setting the
>> `start_message_id_inclusive` value to `CommandSubscribe` command.
>>
>> ### Compatibility
>>
>> This feature can have both backward and forward compatibility, this means
>> the users can use any client to request any broker.
>>
>> Notice that the users still can read the message of the latest position by
>> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
>> call can be ignored when using the new client and the new broker.
>>
>

Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side

Posted by PengHui Li <pe...@apache.org>.
Hi zixuan,

The proposal looks good,
regarding the compatibility, should we check the protocol version at the
client side?
The old version version doesn't support `start_message_id_inclusive` which
means the
client side still needs to do the seek operation while requesting an old
version broker.

> Notice that the users still can read the message of the latest position by
call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
call can be ignored when using the new client and the new broker.

For this part, I don't think it should be a notice, We do not deliberately
modify current behavior,
just fix the problem if users don't want to call
reader.hasMessageAvailable() because they know
topic has at least one message. Add notice here, it looks like after this
proposal, we are not
suggesting to use hasMessageAvailable() any more.

Penghui

On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:

> Hi Pulsar community,
>
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>
> Thanks,
> Zixuan
>
> -----
>
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1,2,3,4 in the topic:
>
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
>
>  Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
>
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
>
> This approach is confusing.   If we do this on the broker side, it will
> make things easier.
>
> ## Goal
>
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
>
> - Add to `Consumer`
> - Add to `Reader`
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
>     // some fields
>
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
>
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
>
> ### Client
>
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
>
> ### Compatibility
>
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
>
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>