You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Zixuan Liu <no...@gmail.com> on 2022/04/06 08:54:05 UTC
[VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Hi Pulsar community,
Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
Thanks,
Zixuan
-----
Discussion thread:
https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
## Motivation
Currently, the Pulsar-client supports setting the `startMessageId` for
Consumer and Reader, and also supports reading the message of
`startMessageId` position.
Assume, we have two message id 1,2,3,4 in the topic:
- When we set `earliest` as `startMessageId` value, we can get the message
of message id 1
- When we set `latest` as `startMessageId` value, we can't get any message
Sometimes we want to read the message id 4 for the first time, we have
only one approach in client:
```
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.subscriptionName(subscriptionName)
.startMessageId(MessageId.latest)
.startMessageIdInclusive()
.create();
reader.hasMessageAvailable();
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
```
Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
correct message id 4, which include seek action when the
`startMessageIdInclusive()` is enabled.
This approach is confusing. If we do this on the broker side, it will
make things easier.
## Goal
This PIP proposes support for reading the message of `startMessageId`
position on the broker side:
- Add to `Consumer`
- Add to `Reader`
## Implementation
### Protocol
Add a `start_message_id_inclusive` field to `CommandSubscribe` for
determine whether to read the message of `startMessageId` position:
```
message CommandSubscribe {
// some fields
// If specified, the subscription will read the message from the start
message id position.
optional bool start_message_id_inclusive = 20 [default = false];
}
```
### ManagedCursorImpl
Add a check in
`org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
We only need to care that the `startMessageId` is `MessageId.latest` and
the`start_message_id_inclusive` is `true`, we get latest position from
ledger as `readPosition` value, otherwise if
the`start_message_id_inclusive` is `false`, get next position of the
latest position as `readPosition` value.
### Client
The `Consumer` and `Reader` support setting the
`start_message_id_inclusive` value to `CommandSubscribe` command.
### Compatibility
This feature can have both backward and forward compatibility, this means
the users can use any client to request any broker.
Notice that the users still can read the message of the latest position by
call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
call can be ignored when using the new client and the new broker.
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by Zike Yang <zi...@apache.org>.
+1 (non-binding)
Thanks,
Zike
On Wed, Apr 27, 2022 at 10:44 AM Lin Lin <li...@apache.org> wrote:
>
> +1
>
> Lin Lin
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by Lin Lin <li...@apache.org>.
+1
Lin Lin
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by mattison chao <ma...@gmail.com>.
+1(unbinding)
Thanks,
Mattison
On Mon, 11 Apr 2022 at 13:06, Haiting Jiang <ji...@apache.org> wrote:
> +1
>
> Thanks,
> Haiting
>
> On 2022/04/06 08:54:05 Zixuan Liu wrote:
> > Hi Pulsar community,
> >
> > Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> >
> > Thanks,
> > Zixuan
> >
> > -----
> >
> > Discussion thread:
> > https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> >
> > ## Motivation
> >
> > Currently, the Pulsar-client supports setting the `startMessageId` for
> > Consumer and Reader, and also supports reading the message of
> > `startMessageId` position.
> >
> > Assume, we have two message id 1,2,3,4 in the topic:
> >
> > - When we set `earliest` as `startMessageId` value, we can get the
> message
> > of message id 1
> > - When we set `latest` as `startMessageId` value, we can't get any
> message
> >
> > Sometimes we want to read the message id 4 for the first time, we have
> > only one approach in client:
> >
> > ```
> > Reader<byte[]> reader = pulsarClient.newReader()
> > .topic(topicName)
> > .subscriptionName(subscriptionName)
> > .startMessageId(MessageId.latest)
> > .startMessageIdInclusive()
> > .create();
> >
> > reader.hasMessageAvailable();
> > Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> > ```
> >
> > Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> the
> > correct message id 4, which include seek action when the
> > `startMessageIdInclusive()` is enabled.
> >
> > This approach is confusing. If we do this on the broker side, it will
> > make things easier.
> >
> > ## Goal
> >
> > This PIP proposes support for reading the message of `startMessageId`
> > position on the broker side:
> >
> > - Add to `Consumer`
> > - Add to `Reader`
> >
> > ## Implementation
> >
> > ### Protocol
> >
> > Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> > determine whether to read the message of `startMessageId` position:
> >
> > ```
> > message CommandSubscribe {
> > // some fields
> >
> > // If specified, the subscription will read the message from the
> start
> > message id position.
> > optional bool start_message_id_inclusive = 20 [default = false];
> > }
> > ```
> >
> > ### ManagedCursorImpl
> >
> > Add a check in
> >
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> >
> >
> > We only need to care that the `startMessageId` is `MessageId.latest` and
> > the`start_message_id_inclusive` is `true`, we get latest position from
> > ledger as `readPosition` value, otherwise if
> > the`start_message_id_inclusive` is `false`, get next position of the
> > latest position as `readPosition` value.
> >
> > ### Client
> >
> > The `Consumer` and `Reader` support setting the
> > `start_message_id_inclusive` value to `CommandSubscribe` command.
> >
> > ### Compatibility
> >
> > This feature can have both backward and forward compatibility, this means
> > the users can use any client to request any broker.
> >
> > Notice that the users still can read the message of the latest position
> by
> > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > call can be ignored when using the new client and the new broker.
> >
>
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by Haiting Jiang <ji...@apache.org>.
+1
Thanks,
Haiting
On 2022/04/06 08:54:05 Zixuan Liu wrote:
> Hi Pulsar community,
>
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>
> Thanks,
> Zixuan
>
> -----
>
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1,2,3,4 in the topic:
>
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
>
> Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
>
> ```
> Reader<byte[]> reader = pulsarClient.newReader()
> .topic(topicName)
> .subscriptionName(subscriptionName)
> .startMessageId(MessageId.latest)
> .startMessageIdInclusive()
> .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
>
> This approach is confusing. If we do this on the broker side, it will
> make things easier.
>
> ## Goal
>
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
>
> - Add to `Consumer`
> - Add to `Reader`
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
> // some fields
>
> // If specified, the subscription will read the message from the start
> message id position.
> optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`, get next position of the
> latest position as `readPosition` value.
>
> ### Client
>
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
>
> ### Compatibility
>
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
>
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by Zixuan Liu <no...@gmail.com>.
Thanks for your vote! Closed by 3 (+1) binding vote and 0 (-1) vote.
Thanks,
Zixuan
Zixuan Liu <no...@gmail.com> 于2022年4月6日周三 16:54写道:
> Hi Pulsar community,
>
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>
> Thanks,
> Zixuan
>
> -----
>
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1,2,3,4 in the topic:
>
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
>
> Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
>
> ```
> Reader<byte[]> reader = pulsarClient.newReader()
> .topic(topicName)
> .subscriptionName(subscriptionName)
> .startMessageId(MessageId.latest)
> .startMessageIdInclusive()
> .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
>
> This approach is confusing. If we do this on the broker side, it will
> make things easier.
>
> ## Goal
>
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
>
> - Add to `Consumer`
> - Add to `Reader`
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
> // some fields
>
> // If specified, the subscription will read the message from the start
> message id position.
> optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`, get next position of the
> latest position as `readPosition` value.
>
> ### Client
>
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
>
> ### Compatibility
>
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
>
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
>
>
>
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by Zixuan Liu <no...@gmail.com>.
For old version that doesn't support `start_message_id_inclusive`, should I
fix the client side?
You mean that call the hasMessageAvailable method in new consumer and
reader?
PengHui Li <pe...@apache.org> 于2022年4月11日周一 09:10写道:
> Hi zixuan,
>
> The proposal looks good,
> regarding the compatibility, should we check the protocol version at the
> client side?
> The old version version doesn't support `start_message_id_inclusive` which
> means the
> client side still needs to do the seek operation while requesting an old
> version broker.
>
> > Notice that the users still can read the message of the latest position
> by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
> For this part, I don't think it should be a notice, We do not deliberately
> modify current behavior,
> just fix the problem if users don't want to call
> reader.hasMessageAvailable() because they know
> topic has at least one message. Add notice here, it looks like after this
> proposal, we are not
> suggesting to use hasMessageAvailable() any more.
>
> Penghui
>
> On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
>
> > Hi Pulsar community,
> >
> > Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> >
> > Thanks,
> > Zixuan
> >
> > -----
> >
> > Discussion thread:
> > https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> >
> > ## Motivation
> >
> > Currently, the Pulsar-client supports setting the `startMessageId` for
> > Consumer and Reader, and also supports reading the message of
> > `startMessageId` position.
> >
> > Assume, we have two message id 1,2,3,4 in the topic:
> >
> > - When we set `earliest` as `startMessageId` value, we can get the
> message
> > of message id 1
> > - When we set `latest` as `startMessageId` value, we can't get any
> message
> >
> > Sometimes we want to read the message id 4 for the first time, we have
> > only one approach in client:
> >
> > ```
> > Reader<byte[]> reader = pulsarClient.newReader()
> > .topic(topicName)
> > .subscriptionName(subscriptionName)
> > .startMessageId(MessageId.latest)
> > .startMessageIdInclusive()
> > .create();
> >
> > reader.hasMessageAvailable();
> > Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> > ```
> >
> > Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> the
> > correct message id 4, which include seek action when the
> > `startMessageIdInclusive()` is enabled.
> >
> > This approach is confusing. If we do this on the broker side, it will
> > make things easier.
> >
> > ## Goal
> >
> > This PIP proposes support for reading the message of `startMessageId`
> > position on the broker side:
> >
> > - Add to `Consumer`
> > - Add to `Reader`
> >
> > ## Implementation
> >
> > ### Protocol
> >
> > Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> > determine whether to read the message of `startMessageId` position:
> >
> > ```
> > message CommandSubscribe {
> > // some fields
> >
> > // If specified, the subscription will read the message from the
> start
> > message id position.
> > optional bool start_message_id_inclusive = 20 [default = false];
> > }
> > ```
> >
> > ### ManagedCursorImpl
> >
> > Add a check in
> >
> >
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> >
> >
> > We only need to care that the `startMessageId` is `MessageId.latest` and
> > the`start_message_id_inclusive` is `true`, we get latest position from
> > ledger as `readPosition` value, otherwise if
> > the`start_message_id_inclusive` is `false`, get next position of the
> > latest position as `readPosition` value.
> >
> > ### Client
> >
> > The `Consumer` and `Reader` support setting the
> > `start_message_id_inclusive` value to `CommandSubscribe` command.
> >
> > ### Compatibility
> >
> > This feature can have both backward and forward compatibility, this means
> > the users can use any client to request any broker.
> >
> > Notice that the users still can read the message of the latest position
> by
> > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > call can be ignored when using the new client and the new broker.
> >
>
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by Hang Chen <ch...@apache.org>.
+1
Thanks,
Hang
Zixuan Liu <no...@gmail.com> 于2022年4月11日周一 18:30写道:
>
> This way is right.
>
> PengHui Li <pe...@apache.org> 于2022年4月11日周一 09:12写道:
>
> > > regarding the compatibility, should we check the protocol version at the
> > client side?
> > The old version version doesn't support `start_message_id_inclusive` which
> > means the
> > client side still needs to do the seek operation while requesting an old
> > version broker.
> >
> > Oh sorry, my fault. If the broker side does not support
> > `start_message_id_inclusive`, users
> > need to call `reader.hasMessageAvailable()` first.
> >
> > +1 for the proposal.
> >
> > Penghui
> >
> > On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <pe...@apache.org> wrote:
> >
> > > Hi zixuan,
> > >
> > > The proposal looks good,
> > > regarding the compatibility, should we check the protocol version at the
> > > client side?
> > > The old version version doesn't support `start_message_id_inclusive`
> > which
> > > means the
> > > client side still needs to do the seek operation while requesting an old
> > > version broker.
> > >
> > > > Notice that the users still can read the message of the latest position
> > > by
> > > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > > call can be ignored when using the new client and the new broker.
> > >
> > > For this part, I don't think it should be a notice, We do not
> > deliberately
> > > modify current behavior,
> > > just fix the problem if users don't want to call
> > > reader.hasMessageAvailable() because they know
> > > topic has at least one message. Add notice here, it looks like after this
> > > proposal, we are not
> > > suggesting to use hasMessageAvailable() any more.
> > >
> > > Penghui
> > >
> > > On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
> > >
> > >> Hi Pulsar community,
> > >>
> > >> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> > >>
> > >> Thanks,
> > >> Zixuan
> > >>
> > >> -----
> > >>
> > >> Discussion thread:
> > >> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> > >>
> > >> ## Motivation
> > >>
> > >> Currently, the Pulsar-client supports setting the `startMessageId` for
> > >> Consumer and Reader, and also supports reading the message of
> > >> `startMessageId` position.
> > >>
> > >> Assume, we have two message id 1,2,3,4 in the topic:
> > >>
> > >> - When we set `earliest` as `startMessageId` value, we can get the
> > message
> > >> of message id 1
> > >> - When we set `latest` as `startMessageId` value, we can't get any
> > message
> > >>
> > >> Sometimes we want to read the message id 4 for the first time, we have
> > >> only one approach in client:
> > >>
> > >> ```
> > >> Reader<byte[]> reader = pulsarClient.newReader()
> > >> .topic(topicName)
> > >> .subscriptionName(subscriptionName)
> > >> .startMessageId(MessageId.latest)
> > >> .startMessageIdInclusive()
> > >> .create();
> > >>
> > >> reader.hasMessageAvailable();
> > >> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> > >> ```
> > >>
> > >> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> > the
> > >> correct message id 4, which include seek action when the
> > >> `startMessageIdInclusive()` is enabled.
> > >>
> > >> This approach is confusing. If we do this on the broker side, it will
> > >> make things easier.
> > >>
> > >> ## Goal
> > >>
> > >> This PIP proposes support for reading the message of `startMessageId`
> > >> position on the broker side:
> > >>
> > >> - Add to `Consumer`
> > >> - Add to `Reader`
> > >>
> > >> ## Implementation
> > >>
> > >> ### Protocol
> > >>
> > >> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> > >> determine whether to read the message of `startMessageId` position:
> > >>
> > >> ```
> > >> message CommandSubscribe {
> > >> // some fields
> > >>
> > >> // If specified, the subscription will read the message from the
> > start
> > >> message id position.
> > >> optional bool start_message_id_inclusive = 20 [default = false];
> > >> }
> > >> ```
> > >>
> > >> ### ManagedCursorImpl
> > >>
> > >> Add a check in
> > >>
> > >>
> > `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> > >>
> > >>
> > >> We only need to care that the `startMessageId` is `MessageId.latest` and
> > >> the`start_message_id_inclusive` is `true`, we get latest position from
> > >> ledger as `readPosition` value, otherwise if
> > >> the`start_message_id_inclusive` is `false`, get next position of the
> > >> latest position as `readPosition` value.
> > >>
> > >> ### Client
> > >>
> > >> The `Consumer` and `Reader` support setting the
> > >> `start_message_id_inclusive` value to `CommandSubscribe` command.
> > >>
> > >> ### Compatibility
> > >>
> > >> This feature can have both backward and forward compatibility, this
> > means
> > >> the users can use any client to request any broker.
> > >>
> > >> Notice that the users still can read the message of the latest position
> > by
> > >> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > >> call can be ignored when using the new client and the new broker.
> > >>
> > >
> >
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by Zixuan Liu <no...@gmail.com>.
This way is right.
PengHui Li <pe...@apache.org> 于2022年4月11日周一 09:12写道:
> > regarding the compatibility, should we check the protocol version at the
> client side?
> The old version version doesn't support `start_message_id_inclusive` which
> means the
> client side still needs to do the seek operation while requesting an old
> version broker.
>
> Oh sorry, my fault. If the broker side does not support
> `start_message_id_inclusive`, users
> need to call `reader.hasMessageAvailable()` first.
>
> +1 for the proposal.
>
> Penghui
>
> On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <pe...@apache.org> wrote:
>
> > Hi zixuan,
> >
> > The proposal looks good,
> > regarding the compatibility, should we check the protocol version at the
> > client side?
> > The old version version doesn't support `start_message_id_inclusive`
> which
> > means the
> > client side still needs to do the seek operation while requesting an old
> > version broker.
> >
> > > Notice that the users still can read the message of the latest position
> > by
> > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> > call can be ignored when using the new client and the new broker.
> >
> > For this part, I don't think it should be a notice, We do not
> deliberately
> > modify current behavior,
> > just fix the problem if users don't want to call
> > reader.hasMessageAvailable() because they know
> > topic has at least one message. Add notice here, it looks like after this
> > proposal, we are not
> > suggesting to use hasMessageAvailable() any more.
> >
> > Penghui
> >
> > On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
> >
> >> Hi Pulsar community,
> >>
> >> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> >>
> >> Thanks,
> >> Zixuan
> >>
> >> -----
> >>
> >> Discussion thread:
> >> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> >>
> >> ## Motivation
> >>
> >> Currently, the Pulsar-client supports setting the `startMessageId` for
> >> Consumer and Reader, and also supports reading the message of
> >> `startMessageId` position.
> >>
> >> Assume, we have two message id 1,2,3,4 in the topic:
> >>
> >> - When we set `earliest` as `startMessageId` value, we can get the
> message
> >> of message id 1
> >> - When we set `latest` as `startMessageId` value, we can't get any
> message
> >>
> >> Sometimes we want to read the message id 4 for the first time, we have
> >> only one approach in client:
> >>
> >> ```
> >> Reader<byte[]> reader = pulsarClient.newReader()
> >> .topic(topicName)
> >> .subscriptionName(subscriptionName)
> >> .startMessageId(MessageId.latest)
> >> .startMessageIdInclusive()
> >> .create();
> >>
> >> reader.hasMessageAvailable();
> >> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> >> ```
> >>
> >> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get
> the
> >> correct message id 4, which include seek action when the
> >> `startMessageIdInclusive()` is enabled.
> >>
> >> This approach is confusing. If we do this on the broker side, it will
> >> make things easier.
> >>
> >> ## Goal
> >>
> >> This PIP proposes support for reading the message of `startMessageId`
> >> position on the broker side:
> >>
> >> - Add to `Consumer`
> >> - Add to `Reader`
> >>
> >> ## Implementation
> >>
> >> ### Protocol
> >>
> >> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> >> determine whether to read the message of `startMessageId` position:
> >>
> >> ```
> >> message CommandSubscribe {
> >> // some fields
> >>
> >> // If specified, the subscription will read the message from the
> start
> >> message id position.
> >> optional bool start_message_id_inclusive = 20 [default = false];
> >> }
> >> ```
> >>
> >> ### ManagedCursorImpl
> >>
> >> Add a check in
> >>
> >>
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> >>
> >>
> >> We only need to care that the `startMessageId` is `MessageId.latest` and
> >> the`start_message_id_inclusive` is `true`, we get latest position from
> >> ledger as `readPosition` value, otherwise if
> >> the`start_message_id_inclusive` is `false`, get next position of the
> >> latest position as `readPosition` value.
> >>
> >> ### Client
> >>
> >> The `Consumer` and `Reader` support setting the
> >> `start_message_id_inclusive` value to `CommandSubscribe` command.
> >>
> >> ### Compatibility
> >>
> >> This feature can have both backward and forward compatibility, this
> means
> >> the users can use any client to request any broker.
> >>
> >> Notice that the users still can read the message of the latest position
> by
> >> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> >> call can be ignored when using the new client and the new broker.
> >>
> >
>
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by PengHui Li <pe...@apache.org>.
> regarding the compatibility, should we check the protocol version at the
client side?
The old version version doesn't support `start_message_id_inclusive` which
means the
client side still needs to do the seek operation while requesting an old
version broker.
Oh sorry, my fault. If the broker side does not support
`start_message_id_inclusive`, users
need to call `reader.hasMessageAvailable()` first.
+1 for the proposal.
Penghui
On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <pe...@apache.org> wrote:
> Hi zixuan,
>
> The proposal looks good,
> regarding the compatibility, should we check the protocol version at the
> client side?
> The old version version doesn't support `start_message_id_inclusive` which
> means the
> client side still needs to do the seek operation while requesting an old
> version broker.
>
> > Notice that the users still can read the message of the latest position
> by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
> For this part, I don't think it should be a notice, We do not deliberately
> modify current behavior,
> just fix the problem if users don't want to call
> reader.hasMessageAvailable() because they know
> topic has at least one message. Add notice here, it looks like after this
> proposal, we are not
> suggesting to use hasMessageAvailable() any more.
>
> Penghui
>
> On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
>
>> Hi Pulsar community,
>>
>> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>>
>> Thanks,
>> Zixuan
>>
>> -----
>>
>> Discussion thread:
>> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>>
>> ## Motivation
>>
>> Currently, the Pulsar-client supports setting the `startMessageId` for
>> Consumer and Reader, and also supports reading the message of
>> `startMessageId` position.
>>
>> Assume, we have two message id 1,2,3,4 in the topic:
>>
>> - When we set `earliest` as `startMessageId` value, we can get the message
>> of message id 1
>> - When we set `latest` as `startMessageId` value, we can't get any message
>>
>> Sometimes we want to read the message id 4 for the first time, we have
>> only one approach in client:
>>
>> ```
>> Reader<byte[]> reader = pulsarClient.newReader()
>> .topic(topicName)
>> .subscriptionName(subscriptionName)
>> .startMessageId(MessageId.latest)
>> .startMessageIdInclusive()
>> .create();
>>
>> reader.hasMessageAvailable();
>> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
>> ```
>>
>> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
>> correct message id 4, which include seek action when the
>> `startMessageIdInclusive()` is enabled.
>>
>> This approach is confusing. If we do this on the broker side, it will
>> make things easier.
>>
>> ## Goal
>>
>> This PIP proposes support for reading the message of `startMessageId`
>> position on the broker side:
>>
>> - Add to `Consumer`
>> - Add to `Reader`
>>
>> ## Implementation
>>
>> ### Protocol
>>
>> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
>> determine whether to read the message of `startMessageId` position:
>>
>> ```
>> message CommandSubscribe {
>> // some fields
>>
>> // If specified, the subscription will read the message from the start
>> message id position.
>> optional bool start_message_id_inclusive = 20 [default = false];
>> }
>> ```
>>
>> ### ManagedCursorImpl
>>
>> Add a check in
>>
>> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>>
>>
>> We only need to care that the `startMessageId` is `MessageId.latest` and
>> the`start_message_id_inclusive` is `true`, we get latest position from
>> ledger as `readPosition` value, otherwise if
>> the`start_message_id_inclusive` is `false`, get next position of the
>> latest position as `readPosition` value.
>>
>> ### Client
>>
>> The `Consumer` and `Reader` support setting the
>> `start_message_id_inclusive` value to `CommandSubscribe` command.
>>
>> ### Compatibility
>>
>> This feature can have both backward and forward compatibility, this means
>> the users can use any client to request any broker.
>>
>> Notice that the users still can read the message of the latest position by
>> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
>> call can be ignored when using the new client and the new broker.
>>
>
Re: [VOTE] [PIP-150] Support read the message of startMessageId position on the broker side
Posted by PengHui Li <pe...@apache.org>.
Hi zixuan,
The proposal looks good,
regarding the compatibility, should we check the protocol version at the
client side?
The old version version doesn't support `start_message_id_inclusive` which
means the
client side still needs to do the seek operation while requesting an old
version broker.
> Notice that the users still can read the message of the latest position by
call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
call can be ignored when using the new client and the new broker.
For this part, I don't think it should be a notice, We do not deliberately
modify current behavior,
just fix the problem if users don't want to call
reader.hasMessageAvailable() because they know
topic has at least one message. Add notice here, it looks like after this
proposal, we are not
suggesting to use hasMessageAvailable() any more.
Penghui
On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <no...@gmail.com> wrote:
> Hi Pulsar community,
>
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>
> Thanks,
> Zixuan
>
> -----
>
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1,2,3,4 in the topic:
>
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
>
> Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
>
> ```
> Reader<byte[]> reader = pulsarClient.newReader()
> .topic(topicName)
> .subscriptionName(subscriptionName)
> .startMessageId(MessageId.latest)
> .startMessageIdInclusive()
> .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
>
> This approach is confusing. If we do this on the broker side, it will
> make things easier.
>
> ## Goal
>
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
>
> - Add to `Consumer`
> - Add to `Reader`
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
> // some fields
>
> // If specified, the subscription will read the message from the start
> message id position.
> optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
>
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`, get next position of the
> latest position as `readPosition` value.
>
> ### Client
>
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
>
> ### Compatibility
>
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
>
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>