You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Talat Uyarer <tu...@paloaltonetworks.com> on 2021/11/29 18:48:53 UTC

Filtering support on Fetch API

Hi All,

I want to get your advice about one subject. I want to create a KIP for
message header base filtering on Fetch API.

Our current use case We have 1k+ topics and per topic, have 10+ consumers
for different use cases. However all consumers are interested in different
sets of messages on the same topic. Currently  We read all messages from a
given topic and drop logs on the consumer side. To reduce our stream
processing cost I want to drop logs on the broker side. So far my
understanding

*Broker send messages as is (No serilization cost) -> Network Transfer ->
> Consumer Deserialize Messages(User side deserilization cost) -> User Space
> drop or use messages (User Sidefiltering cost)*


If I can drop messages based on their headers without serialization and
deserialization messages. It will help us save network bandwidth and as
well as consumer side cpu cost.

My approach is building a header index. Consumer clients will define
their filter in the fetch call. If the filter is matching, the broker will
send the messages. I would like to hear your suggestions about my solution.

Thanks

Re: Filtering support on Fetch API

Posted by Tom Scott <th...@gmail.com>.
Hi Talat,

  I've been following this with interest. Following on from the multiple
topics answer have you considered creating new logical topics server side?
These would overlay existing topics but have filters applied.

  For instance, given a topic: topic1 you could have an admin API that
creates a logical topic: topic1_filtered_on_someHeader. This topic does not
store any data itself but instead reads and filters data from topic1. Any
consumers consuming from the logical topic will see only the filtered data.

This has a couple of advantages:

   - The fetch API can be used as is avoiding potential bloat in the
   client  -> broker traffic and the need to upgrade existing clients
   - Filtering can be managed centrally rather than at a client level
   - Governance benefits within the existing authorization frameworks (some
   clients can see only filtered topics etc.)

Thanks

  Tom


On Mon, Dec 6, 2021 at 3:54 PM Igor Soarez <i...@soarez.me> wrote:

> Hi Talat,
>
> Have you considered using 10x more topics - perhaps using multiple
> clusters - and avoid having to do any filtering in the clients?
>
> --
> Igor
>
> On Tue, Nov 30, 2021, at 8:16 PM, Talat Uyarer wrote:
> > Hi Eric,
> >
> > Thanks for your comments. My goal is apply filter without any
> > serialization.
> >
> > I will generate headers distinct values on Record Batch in producer.
> Broker
> > will build an index for header values like as timeindex. When consumer
> > apply filter broker will filter only record batch level. Filter will not
> > guarantee exact results. but it will reduce cost consumer side. Consumer
> > still needs to do whatever it does but for less amount of messages.
> >
> > Do you see any issue ? In this model I think we dont have any penalty
> > except creating additional index file on broker and increase storage size
> > little bit.
> >
> > Thanks
> >
> > On Tue, Nov 30, 2021 at 10:21 AM Eric Azama <ea...@gmail.com> wrote:
> >
> >> Something to keep in mind with your proposal is that you're moving the
> >> Decompression and Filtering costs into the Brokers. It probably also
> adds a
> >> new Compression cost if you want the Broker to send compressed data over
> >> the network. Centralizing that cost on the cluster may not be desirable
> and
> >> would likely increase latency across the board.
> >>
> >> Additionally, because header values are byte arrays, the Brokers
> probably
> >> would not be able to do very sophisticated filtering. Support for basic
> >> comparisons of the built-in Serdes might be simple enough, but anything
> >> more complex or involving custom Serdes would probably require a new
> >> plug-in type on the broker.
> >>
> >> On Mon, Nov 29, 2021 at 10:49 AM Talat Uyarer <
> >> tuyarer@paloaltonetworks.com>
> >> wrote:
> >>
> >> > Hi All,
> >> >
> >> > I want to get your advice about one subject. I want to create a KIP
> for
> >> > message header base filtering on Fetch API.
> >> >
> >> > Our current use case We have 1k+ topics and per topic, have 10+
> consumers
> >> > for different use cases. However all consumers are interested in
> >> different
> >> > sets of messages on the same topic. Currently  We read all messages
> from
> >> a
> >> > given topic and drop logs on the consumer side. To reduce our stream
> >> > processing cost I want to drop logs on the broker side. So far my
> >> > understanding
> >> >
> >> > *Broker send messages as is (No serilization cost) -> Network
> Transfer ->
> >> > > Consumer Deserialize Messages(User side deserilization cost) -> User
> >> > Space
> >> > > drop or use messages (User Sidefiltering cost)*
> >> >
> >> >
> >> > If I can drop messages based on their headers without serialization
> and
> >> > deserialization messages. It will help us save network bandwidth and
> as
> >> > well as consumer side cpu cost.
> >> >
> >> > My approach is building a header index. Consumer clients will define
> >> > their filter in the fetch call. If the filter is matching, the broker
> >> will
> >> > send the messages. I would like to hear your suggestions about my
> >> solution.
> >> >
> >> > Thanks
> >> >
> >>
>

Re: Filtering support on Fetch API

Posted by Igor Soarez <i...@soarez.me>.
Hi Talat,

Have you considered using 10x more topics - perhaps using multiple clusters - and avoid having to do any filtering in the clients?

--
Igor

On Tue, Nov 30, 2021, at 8:16 PM, Talat Uyarer wrote:
> Hi Eric,
>
> Thanks for your comments. My goal is apply filter without any
> serialization.
>
> I will generate headers distinct values on Record Batch in producer. Broker
> will build an index for header values like as timeindex. When consumer
> apply filter broker will filter only record batch level. Filter will not
> guarantee exact results. but it will reduce cost consumer side. Consumer
> still needs to do whatever it does but for less amount of messages.
>
> Do you see any issue ? In this model I think we dont have any penalty
> except creating additional index file on broker and increase storage size
> little bit.
>
> Thanks
>
> On Tue, Nov 30, 2021 at 10:21 AM Eric Azama <ea...@gmail.com> wrote:
>
>> Something to keep in mind with your proposal is that you're moving the
>> Decompression and Filtering costs into the Brokers. It probably also adds a
>> new Compression cost if you want the Broker to send compressed data over
>> the network. Centralizing that cost on the cluster may not be desirable and
>> would likely increase latency across the board.
>>
>> Additionally, because header values are byte arrays, the Brokers probably
>> would not be able to do very sophisticated filtering. Support for basic
>> comparisons of the built-in Serdes might be simple enough, but anything
>> more complex or involving custom Serdes would probably require a new
>> plug-in type on the broker.
>>
>> On Mon, Nov 29, 2021 at 10:49 AM Talat Uyarer <
>> tuyarer@paloaltonetworks.com>
>> wrote:
>>
>> > Hi All,
>> >
>> > I want to get your advice about one subject. I want to create a KIP for
>> > message header base filtering on Fetch API.
>> >
>> > Our current use case We have 1k+ topics and per topic, have 10+ consumers
>> > for different use cases. However all consumers are interested in
>> different
>> > sets of messages on the same topic. Currently  We read all messages from
>> a
>> > given topic and drop logs on the consumer side. To reduce our stream
>> > processing cost I want to drop logs on the broker side. So far my
>> > understanding
>> >
>> > *Broker send messages as is (No serilization cost) -> Network Transfer ->
>> > > Consumer Deserialize Messages(User side deserilization cost) -> User
>> > Space
>> > > drop or use messages (User Sidefiltering cost)*
>> >
>> >
>> > If I can drop messages based on their headers without serialization and
>> > deserialization messages. It will help us save network bandwidth and as
>> > well as consumer side cpu cost.
>> >
>> > My approach is building a header index. Consumer clients will define
>> > their filter in the fetch call. If the filter is matching, the broker
>> will
>> > send the messages. I would like to hear your suggestions about my
>> solution.
>> >
>> > Thanks
>> >
>>

Re: Filtering support on Fetch API

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Eric,

Thanks for your comments. My goal is apply filter without any
serialization.

I will generate headers distinct values on Record Batch in producer. Broker
will build an index for header values like as timeindex. When consumer
apply filter broker will filter only record batch level. Filter will not
guarantee exact results. but it will reduce cost consumer side. Consumer
still needs to do whatever it does but for less amount of messages.

Do you see any issue ? In this model I think we dont have any penalty
except creating additional index file on broker and increase storage size
little bit.

Thanks

On Tue, Nov 30, 2021 at 10:21 AM Eric Azama <ea...@gmail.com> wrote:

> Something to keep in mind with your proposal is that you're moving the
> Decompression and Filtering costs into the Brokers. It probably also adds a
> new Compression cost if you want the Broker to send compressed data over
> the network. Centralizing that cost on the cluster may not be desirable and
> would likely increase latency across the board.
>
> Additionally, because header values are byte arrays, the Brokers probably
> would not be able to do very sophisticated filtering. Support for basic
> comparisons of the built-in Serdes might be simple enough, but anything
> more complex or involving custom Serdes would probably require a new
> plug-in type on the broker.
>
> On Mon, Nov 29, 2021 at 10:49 AM Talat Uyarer <
> tuyarer@paloaltonetworks.com>
> wrote:
>
> > Hi All,
> >
> > I want to get your advice about one subject. I want to create a KIP for
> > message header base filtering on Fetch API.
> >
> > Our current use case We have 1k+ topics and per topic, have 10+ consumers
> > for different use cases. However all consumers are interested in
> different
> > sets of messages on the same topic. Currently  We read all messages from
> a
> > given topic and drop logs on the consumer side. To reduce our stream
> > processing cost I want to drop logs on the broker side. So far my
> > understanding
> >
> > *Broker send messages as is (No serilization cost) -> Network Transfer ->
> > > Consumer Deserialize Messages(User side deserilization cost) -> User
> > Space
> > > drop or use messages (User Sidefiltering cost)*
> >
> >
> > If I can drop messages based on their headers without serialization and
> > deserialization messages. It will help us save network bandwidth and as
> > well as consumer side cpu cost.
> >
> > My approach is building a header index. Consumer clients will define
> > their filter in the fetch call. If the filter is matching, the broker
> will
> > send the messages. I would like to hear your suggestions about my
> solution.
> >
> > Thanks
> >
>

Re: Filtering support on Fetch API

Posted by Eric Azama <ea...@gmail.com>.
Something to keep in mind with your proposal is that you're moving the
Decompression and Filtering costs into the Brokers. It probably also adds a
new Compression cost if you want the Broker to send compressed data over
the network. Centralizing that cost on the cluster may not be desirable and
would likely increase latency across the board.

Additionally, because header values are byte arrays, the Brokers probably
would not be able to do very sophisticated filtering. Support for basic
comparisons of the built-in Serdes might be simple enough, but anything
more complex or involving custom Serdes would probably require a new
plug-in type on the broker.

On Mon, Nov 29, 2021 at 10:49 AM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi All,
>
> I want to get your advice about one subject. I want to create a KIP for
> message header base filtering on Fetch API.
>
> Our current use case We have 1k+ topics and per topic, have 10+ consumers
> for different use cases. However all consumers are interested in different
> sets of messages on the same topic. Currently  We read all messages from a
> given topic and drop logs on the consumer side. To reduce our stream
> processing cost I want to drop logs on the broker side. So far my
> understanding
>
> *Broker send messages as is (No serilization cost) -> Network Transfer ->
> > Consumer Deserialize Messages(User side deserilization cost) -> User
> Space
> > drop or use messages (User Sidefiltering cost)*
>
>
> If I can drop messages based on their headers without serialization and
> deserialization messages. It will help us save network bandwidth and as
> well as consumer side cpu cost.
>
> My approach is building a header index. Consumer clients will define
> their filter in the fetch call. If the filter is matching, the broker will
> send the messages. I would like to hear your suggestions about my solution.
>
> Thanks
>

Re: Filtering support on Fetch API

Posted by Xiangyuan LI <fl...@gmail.com>.
server-side message filter had been talked for a very very long time, I
notice serveral years ago kafka community already  discuss this feature.
kafka community still no plan implement it for some reason:
- they dont want to break the fetch zero-copy advantage
- kafka think stream project is enough to do this work
- for now kafka keep data use recordBatch, filter exactly records may
difficult.

we have already implement this filter feature in our company, we met
some specific
problem you may be interested in:
- it's hard to filter compressed data. if we uncompress data, it would
become a huge performance problem with server.-> so we think split
record head & record data (may be a new record verson called version_v2)
may be a good idea
- we havn't create "head index" so that we filter message when receive the
fetch request. we should calculate the exact data size(but if save all
record in memory,
it may cause gc problem) or we couldn't set the response length attribute.
-> may be your design could ignore this, but i guess your "head index"
should be sparse index
- fetch request has no attribute indicate "which group send this request",
so we add a extra filed so that server could know what filter should be
chosen.

however, we hope kafka would supprt this feature, it may reduce network
flow cost. thx.

yang chen <sh...@gmail.com> 于2022年1月20日周四 21:31写道:

> Hi, Talat,
> We also need the filter feature, just like apache rocketmq filter feature (
> https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/
> ).
>
> Talat Uyarer <tu...@paloaltonetworks.com> 于2021年11月30日周二 02:49写道:
>
> > Hi All,
> >
> > I want to get your advice about one subject. I want to create a KIP for
> > message header base filtering on Fetch API.
> >
> > Our current use case We have 1k+ topics and per topic, have 10+ consumers
> > for different use cases. However all consumers are interested in
> different
> > sets of messages on the same topic. Currently  We read all messages from
> a
> > given topic and drop logs on the consumer side. To reduce our stream
> > processing cost I want to drop logs on the broker side. So far my
> > understanding
> >
> > *Broker send messages as is (No serilization cost) -> Network Transfer ->
> > > Consumer Deserialize Messages(User side deserilization cost) -> User
> > Space
> > > drop or use messages (User Sidefiltering cost)*
> >
> >
> > If I can drop messages based on their headers without serialization and
> > deserialization messages. It will help us save network bandwidth and as
> > well as consumer side cpu cost.
> >
> > My approach is building a header index. Consumer clients will define
> > their filter in the fetch call. If the filter is matching, the broker
> will
> > send the messages. I would like to hear your suggestions about my
> solution.
> >
> > Thanks
> >
>

Re: Filtering support on Fetch API

Posted by yang chen <sh...@gmail.com>.
Hi, Talat,
We also need the filter feature, just like apache rocketmq filter feature (
https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/).

Talat Uyarer <tu...@paloaltonetworks.com> 于2021年11月30日周二 02:49写道:

> Hi All,
>
> I want to get your advice about one subject. I want to create a KIP for
> message header base filtering on Fetch API.
>
> Our current use case We have 1k+ topics and per topic, have 10+ consumers
> for different use cases. However all consumers are interested in different
> sets of messages on the same topic. Currently  We read all messages from a
> given topic and drop logs on the consumer side. To reduce our stream
> processing cost I want to drop logs on the broker side. So far my
> understanding
>
> *Broker send messages as is (No serilization cost) -> Network Transfer ->
> > Consumer Deserialize Messages(User side deserilization cost) -> User
> Space
> > drop or use messages (User Sidefiltering cost)*
>
>
> If I can drop messages based on their headers without serialization and
> deserialization messages. It will help us save network bandwidth and as
> well as consumer side cpu cost.
>
> My approach is building a header index. Consumer clients will define
> their filter in the fetch call. If the filter is matching, the broker will
> send the messages. I would like to hear your suggestions about my solution.
>
> Thanks
>