You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Anon Hxy <an...@gmail.com> on 2022/07/19 10:00:06 UTC

[DISCUSS] PIP-191: Support batched message using entry filter

Hi Pulsar community:

I open a pip to discuss "Support batched message using entry filter"

Proposal Link: https://github.com/apache/pulsar/issues/16680
---

## Motivation

We already have  a plug-in way to filter entries in broker, aka PIP-105
https://github.com/apache/pulsar/issues/12269.  But this way only checks at
the batch header,  without digging into the individual messages properties.
Of course it provides an interface to deserialize the entire Entry to the
heap,  But this will bring some memory and cpu workload. And in most
scenarios we only need partial properties to do some filter.

This proposal brings a method to make PIP-105 support batched entry without
having to deserialize the entire Entry to the heap


## API Changes

- Add a producer config to specialize the key, of which properties will be
added to the batched entry metadata, for example:

```
org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties

```
The  `batchedFilterProperties` type is `List<String>` with default value is
empty list.  For an empty list, it means that the properties of entry's
metadata are empty, and the `EntryFilter` will not take effect.

## Implementation

- When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
 we extract the message properties and add it to `metadata`:
```
 public boolean add(MessageImpl<?> msg, SendCallback callback) {

        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] add message to batch, num messages in
batch so far {}", topicName, producerName,
                    numMessagesInBatch);
        }

        if (++numMessagesInBatch == 1) {
            try {
                // some properties are common amongst the different
messages in the batch, hence we just pick it up from
                // the first message
                messageMetadata.setSequenceId(msg.getSequenceId());
                List<KeyValue> filterProperties = getProperties(msg);
                if (!filterProperties.isEmpty()) {
                    messageMetadata.addAllProperties(filterProperties);  //
and message properties here
                }
```

-  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
Messages with same properties can be added to the same batch:

```
 private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
        return batchMessageContainer.haveEnoughSpace(msg)
               && (!isMultiSchemaEnabled(false)
                    || batchMessageContainer.hasSameSchema(msg)
                    || batchMessageContainer.hasSameProperties(msg))
                && batchMessageContainer.hasSameTxn(msg);
    }

```


## Reject Alternatives

- Implement a `AbstractBatchMessageContainer` ,  saying
`BatchMessagePropertiesBasedContainer`, keeping messages with same
properties in a single `hashmap` entry,  like
`BatchMessageKeyBasedContainer`.

Rejection reason:  This will publish messages out of order



Thanks,
Xiaoyu Hou

Re: [DISCUSS] PIP-191: Support batched message using entry filter

Posted by Anon Hxy <an...@gmail.com>.
There is an example maybe helpful to understand the properties extraction:

- Let's set `batchedFilterProperties`=`<region, version>`
This means only key named `region` and `version`will be extracted to the
batch meta properties

- Then we have a  producer that sends the messges below in order:
    - `msg1` with properties: `<region: eu>`
    - `msg2` with properties: `<region: eu>`
    - `msg3` with properties: `<region: eu, version:1, tag:a>`
    - `msg4` with properties: `<region: eu, version:1>`
    - `msg5` with properties: `<region: us, version:1>`
    - `msg6` with properties: `<region: us, version:2>`

- The process of properties extraction will be:
   - msg1 and msg2 have the same properties: <region: eu>, so they will put
into the same batch
   - msg3 and msg4 have the same properties: <region: eu, version:1>.
 tag:a in msg3 will be ignored because the `batchedFilterProperties`
doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.
    - msg5 and msg6 have different properties, because the value of version
is different. So we publish msg5 and msg6 with different batch.

- Just to summarize, the result will be:

|             |batch meta properties | single meta properties
| payload | single meta properties   |  payload  |
|------- |---------------------- |------------------------------- |------
  |-------------------------|------ |
|batch1  | <region: eu>                 |  <region: eu>
        | msg1     |  <region: eu>                   |   msg2 |
|batch2 | <region: eu, version:1>  | <region: eu, version:1, tag:a> | msg3
    |  <region: eu, version:1> |   msg4 |
|batch3 | <region: us, version:1> | <region: us, version:1>             |
msg5     |                                         |               |
|batch4 | <region: us, version:2> | <region: us, version:2>            |
msg6     |                                         |              |

Thanks,
Xiaoyu Hou

Anon Hxy <an...@gmail.com> 于2022年7月19日周二 18:00写道:

> Hi Pulsar community:
>
> I open a pip to discuss "Support batched message using entry filter"
>
> Proposal Link: https://github.com/apache/pulsar/issues/16680
> ---
>
> ## Motivation
>
> We already have  a plug-in way to filter entries in broker, aka PIP-105
> https://github.com/apache/pulsar/issues/12269.  But this way only checks
> at the batch header,  without digging into the individual messages
> properties. Of course it provides an interface to deserialize the entire
> Entry to the heap,  But this will bring some memory and cpu workload. And
> in most scenarios we only need partial properties to do some filter.
>
> This proposal brings a method to make PIP-105 support batched entry
> without having to deserialize the entire Entry to the heap
>
>
> ## API Changes
>
> - Add a producer config to specialize the key, of which properties will be
> added to the batched entry metadata, for example:
>
> ```
>
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
>
> ```
> The  `batchedFilterProperties` type is `List<String>` with default value
> is empty list.  For an empty list, it means that the properties of entry's
> metadata are empty, and the `EntryFilter` will not take effect.
>
> ## Implementation
>
> - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
>  we extract the message properties and add it to `metadata`:
> ```
>  public boolean add(MessageImpl<?> msg, SendCallback callback) {
>
>         if (log.isDebugEnabled()) {
>             log.debug("[{}] [{}] add message to batch, num messages in
> batch so far {}", topicName, producerName,
>                     numMessagesInBatch);
>         }
>
>         if (++numMessagesInBatch == 1) {
>             try {
>                 // some properties are common amongst the different
> messages in the batch, hence we just pick it up from
>                 // the first message
>                 messageMetadata.setSequenceId(msg.getSequenceId());
>                 List<KeyValue> filterProperties = getProperties(msg);
>                 if (!filterProperties.isEmpty()) {
>                     messageMetadata.addAllProperties(filterProperties);
>  // and message properties here
>                 }
> ```
>
> -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> Messages with same properties can be added to the same batch:
>
> ```
>  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
>         return batchMessageContainer.haveEnoughSpace(msg)
>                && (!isMultiSchemaEnabled(false)
>                     || batchMessageContainer.hasSameSchema(msg)
>                     || batchMessageContainer.hasSameProperties(msg))
>                 && batchMessageContainer.hasSameTxn(msg);
>     }
>
> ```
>
>
> ## Reject Alternatives
>
> - Implement a `AbstractBatchMessageContainer` ,  saying
> `BatchMessagePropertiesBasedContainer`, keeping messages with same
> properties in a single `hashmap` entry,  like
> `BatchMessageKeyBasedContainer`.
>
> Rejection reason:  This will publish messages out of order
>
>
>
> Thanks,
> Xiaoyu Hou
>

Re: [DISCUSS] PIP-191: Support batched message using entry filter

Posted by Anon Hxy <an...@gmail.com>.
Hi Enrico

Thanks for your replying.

> 1) It is not clear to me if you want to push all the messages metadata
in the main header of the entry or only the metatadata of the first
entry, or only the KEY.

I just want to copy the partial message properties(we call it batch
properties here) to the main header of the entry, without modifying the
payload or single message metadata.  All messages in one batch must have
the same batch properties (both key and value).

>  2) we have to clarify the new message format of the entry and update
the protocol documents

It seems that the protocol documents has not described the batch message
main header. It just describes the `SingleMessageMetadata`. I agree with
that It's better to update the protocol documents to describe the batch
message main header.

> 3) existing EntryFilters won't be able to work well with the new
format, we must find a way to make them fail and not process garbage

It seems that there is no compatibility problems. Because I just add
properties to the main header , which is empty before,  and there is no
modification for the payload and single message metadata. The existing
EntryFilters can be able to work well with the new
format, the same as the existing  Protocol Handlers. :)

Thanks
Xiaoyu Hou

Enrico Olivelli <eo...@gmail.com> 于2022年7月21日周四 20:27写道:

> Thank you for this proposal !
>
> I understand the problem, and I have already thought about it, because
> I am the author of some filters (especially the JMS Selectors Filter)
> but we have to clarify more about this PIP.
>
> 1) It is not clear to me if you want to push all the messages metadata
> in the main header of the entry or only the metatadata of the first
> entry, or only the KEY.
> 2) we have to clarify the new message format of the entry and update
> the protocol documents
> 3) existing EntryFilters won't be able to work well with the new
> format, we must find a way to make them fail and not process garbage
> 4) the same problem applies to Interceptors and Protocol Handlers
> (like KOP), we must make it clear in this PIP what is the upgrade path
> and give some suggestions to the developers of such components
>
> I am supporting this initiative, as far as we clarify those points.
>
> The Pulsar ecosystem is becoming more and more mature, and the user
> base is growing quickly, we can't break things without a clear path
> for the users and for the developers of extensions
>
> Enrico
>
>
> Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang
> <qi...@gmail.com> ha scritto:
> >
> > Good. +1
> >
> > Anon Hxy <an...@gmail.com> 于2022年7月19日周二 18:00写道:
> >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Support batched message using entry filter"
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/16680
> > > ---
> > >
> > > ## Motivation
> > >
> > > We already have  a plug-in way to filter entries in broker, aka PIP-105
> > > https://github.com/apache/pulsar/issues/12269.  But this way only
> checks
> > > at
> > > the batch header,  without digging into the individual messages
> properties.
> > > Of course it provides an interface to deserialize the entire Entry to
> the
> > > heap,  But this will bring some memory and cpu workload. And in most
> > > scenarios we only need partial properties to do some filter.
> > >
> > > This proposal brings a method to make PIP-105 support batched entry
> without
> > > having to deserialize the entire Entry to the heap
> > >
> > >
> > > ## API Changes
> > >
> > > - Add a producer config to specialize the key, of which properties
> will be
> > > added to the batched entry metadata, for example:
> > >
> > > ```
> > >
> > >
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
> > >
> > > ```
> > > The  `batchedFilterProperties` type is `List<String>` with default
> value is
> > > empty list.  For an empty list, it means that the properties of entry's
> > > metadata are empty, and the `EntryFilter` will not take effect.
> > >
> > > ## Implementation
> > >
> > > - When call
> `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
> > >  we extract the message properties and add it to `metadata`:
> > > ```
> > >  public boolean add(MessageImpl<?> msg, SendCallback callback) {
> > >
> > >         if (log.isDebugEnabled()) {
> > >             log.debug("[{}] [{}] add message to batch, num messages in
> > > batch so far {}", topicName, producerName,
> > >                     numMessagesInBatch);
> > >         }
> > >
> > >         if (++numMessagesInBatch == 1) {
> > >             try {
> > >                 // some properties are common amongst the different
> > > messages in the batch, hence we just pick it up from
> > >                 // the first message
> > >                 messageMetadata.setSequenceId(msg.getSequenceId());
> > >                 List<KeyValue> filterProperties = getProperties(msg);
> > >                 if (!filterProperties.isEmpty()) {
> > >
>  messageMetadata.addAllProperties(filterProperties);  //
> > > and message properties here
> > >                 }
> > > ```
> > >
> > > -  Also we need to add a method `hasSameProperties` like
> `hasSameSchema`.
> > > Messages with same properties can be added to the same batch:
> > >
> > > ```
> > >  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
> > >         return batchMessageContainer.haveEnoughSpace(msg)
> > >                && (!isMultiSchemaEnabled(false)
> > >                     || batchMessageContainer.hasSameSchema(msg)
> > >                     || batchMessageContainer.hasSameProperties(msg))
> > >                 && batchMessageContainer.hasSameTxn(msg);
> > >     }
> > >
> > > ```
> > >
> > >
> > > ## Reject Alternatives
> > >
> > > - Implement a `AbstractBatchMessageContainer` ,  saying
> > > `BatchMessagePropertiesBasedContainer`, keeping messages with same
> > > properties in a single `hashmap` entry,  like
> > > `BatchMessageKeyBasedContainer`.
> > >
> > > Rejection reason:  This will publish messages out of order
> > >
> > >
> > >
> > > Thanks,
> > > Xiaoyu Hou
> > >
> >
> >
> > --
> > BR,
> > Qiang Huang
>

Re: [DISCUSS] PIP-191: Support batched message using entry filter

Posted by Enrico Olivelli <eo...@gmail.com>.
Thank you for this proposal !

I understand the problem, and I have already thought about it, because
I am the author of some filters (especially the JMS Selectors Filter)
but we have to clarify more about this PIP.

1) It is not clear to me if you want to push all the messages metadata
in the main header of the entry or only the metatadata of the first
entry, or only the KEY.
2) we have to clarify the new message format of the entry and update
the protocol documents
3) existing EntryFilters won't be able to work well with the new
format, we must find a way to make them fail and not process garbage
4) the same problem applies to Interceptors and Protocol Handlers
(like KOP), we must make it clear in this PIP what is the upgrade path
and give some suggestions to the developers of such components

I am supporting this initiative, as far as we clarify those points.

The Pulsar ecosystem is becoming more and more mature, and the user
base is growing quickly, we can't break things without a clear path
for the users and for the developers of extensions

Enrico


Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang
<qi...@gmail.com> ha scritto:
>
> Good. +1
>
> Anon Hxy <an...@gmail.com> 于2022年7月19日周二 18:00写道:
>
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Support batched message using entry filter"
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/16680
> > ---
> >
> > ## Motivation
> >
> > We already have  a plug-in way to filter entries in broker, aka PIP-105
> > https://github.com/apache/pulsar/issues/12269.  But this way only checks
> > at
> > the batch header,  without digging into the individual messages properties.
> > Of course it provides an interface to deserialize the entire Entry to the
> > heap,  But this will bring some memory and cpu workload. And in most
> > scenarios we only need partial properties to do some filter.
> >
> > This proposal brings a method to make PIP-105 support batched entry without
> > having to deserialize the entire Entry to the heap
> >
> >
> > ## API Changes
> >
> > - Add a producer config to specialize the key, of which properties will be
> > added to the batched entry metadata, for example:
> >
> > ```
> >
> > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
> >
> > ```
> > The  `batchedFilterProperties` type is `List<String>` with default value is
> > empty list.  For an empty list, it means that the properties of entry's
> > metadata are empty, and the `EntryFilter` will not take effect.
> >
> > ## Implementation
> >
> > - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
> >  we extract the message properties and add it to `metadata`:
> > ```
> >  public boolean add(MessageImpl<?> msg, SendCallback callback) {
> >
> >         if (log.isDebugEnabled()) {
> >             log.debug("[{}] [{}] add message to batch, num messages in
> > batch so far {}", topicName, producerName,
> >                     numMessagesInBatch);
> >         }
> >
> >         if (++numMessagesInBatch == 1) {
> >             try {
> >                 // some properties are common amongst the different
> > messages in the batch, hence we just pick it up from
> >                 // the first message
> >                 messageMetadata.setSequenceId(msg.getSequenceId());
> >                 List<KeyValue> filterProperties = getProperties(msg);
> >                 if (!filterProperties.isEmpty()) {
> >                     messageMetadata.addAllProperties(filterProperties);  //
> > and message properties here
> >                 }
> > ```
> >
> > -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> > Messages with same properties can be added to the same batch:
> >
> > ```
> >  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
> >         return batchMessageContainer.haveEnoughSpace(msg)
> >                && (!isMultiSchemaEnabled(false)
> >                     || batchMessageContainer.hasSameSchema(msg)
> >                     || batchMessageContainer.hasSameProperties(msg))
> >                 && batchMessageContainer.hasSameTxn(msg);
> >     }
> >
> > ```
> >
> >
> > ## Reject Alternatives
> >
> > - Implement a `AbstractBatchMessageContainer` ,  saying
> > `BatchMessagePropertiesBasedContainer`, keeping messages with same
> > properties in a single `hashmap` entry,  like
> > `BatchMessageKeyBasedContainer`.
> >
> > Rejection reason:  This will publish messages out of order
> >
> >
> >
> > Thanks,
> > Xiaoyu Hou
> >
>
>
> --
> BR,
> Qiang Huang

Re: [DISCUSS] PIP-191: Support batched message using entry filter

Posted by Qiang Huang <qi...@gmail.com>.
Good. +1

Anon Hxy <an...@gmail.com> 于2022年7月19日周二 18:00写道:

> Hi Pulsar community:
>
> I open a pip to discuss "Support batched message using entry filter"
>
> Proposal Link: https://github.com/apache/pulsar/issues/16680
> ---
>
> ## Motivation
>
> We already have  a plug-in way to filter entries in broker, aka PIP-105
> https://github.com/apache/pulsar/issues/12269.  But this way only checks
> at
> the batch header,  without digging into the individual messages properties.
> Of course it provides an interface to deserialize the entire Entry to the
> heap,  But this will bring some memory and cpu workload. And in most
> scenarios we only need partial properties to do some filter.
>
> This proposal brings a method to make PIP-105 support batched entry without
> having to deserialize the entire Entry to the heap
>
>
> ## API Changes
>
> - Add a producer config to specialize the key, of which properties will be
> added to the batched entry metadata, for example:
>
> ```
>
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
>
> ```
> The  `batchedFilterProperties` type is `List<String>` with default value is
> empty list.  For an empty list, it means that the properties of entry's
> metadata are empty, and the `EntryFilter` will not take effect.
>
> ## Implementation
>
> - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
>  we extract the message properties and add it to `metadata`:
> ```
>  public boolean add(MessageImpl<?> msg, SendCallback callback) {
>
>         if (log.isDebugEnabled()) {
>             log.debug("[{}] [{}] add message to batch, num messages in
> batch so far {}", topicName, producerName,
>                     numMessagesInBatch);
>         }
>
>         if (++numMessagesInBatch == 1) {
>             try {
>                 // some properties are common amongst the different
> messages in the batch, hence we just pick it up from
>                 // the first message
>                 messageMetadata.setSequenceId(msg.getSequenceId());
>                 List<KeyValue> filterProperties = getProperties(msg);
>                 if (!filterProperties.isEmpty()) {
>                     messageMetadata.addAllProperties(filterProperties);  //
> and message properties here
>                 }
> ```
>
> -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> Messages with same properties can be added to the same batch:
>
> ```
>  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
>         return batchMessageContainer.haveEnoughSpace(msg)
>                && (!isMultiSchemaEnabled(false)
>                     || batchMessageContainer.hasSameSchema(msg)
>                     || batchMessageContainer.hasSameProperties(msg))
>                 && batchMessageContainer.hasSameTxn(msg);
>     }
>
> ```
>
>
> ## Reject Alternatives
>
> - Implement a `AbstractBatchMessageContainer` ,  saying
> `BatchMessagePropertiesBasedContainer`, keeping messages with same
> properties in a single `hashmap` entry,  like
> `BatchMessageKeyBasedContainer`.
>
> Rejection reason:  This will publish messages out of order
>
>
>
> Thanks,
> Xiaoyu Hou
>


-- 
BR,
Qiang Huang