You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Aloys Zhang <al...@apache.org> on 2022/08/25 14:37:50 UTC

[Discuss] [PIP-204] Extensions for BrokerInterceptor

Hi Pulsar Community,

This is a PIP discussion on extending the BrokerInterceptor.
Details can be found in the issue:
https://github.com/apache/pulsar/issues/17267

Copying the content here for convenience, any suggestions are welcome and
appreciated.

Motivation
Currently, we have a reconciliation system that compares the message(or
entry) accounts produced and consumed at the minute level to indicate
whether all the data has been consumed completely by the consumers.
We want to track the message(or entry) properties (including the timestamp
and msgSize) for messages has been persistent to bookie and messages have
been consumed & acked. Also, we want to track the event of the producer and
consumer closed.
A good way to achieve this is using the BrokerInterceptor to interceptor
the message at certain points. So we want to expand some interfaces for
BrokerInterceptor like what #12858 did.
Goal
- Get timestamp and size(or other properties) for the entry which has been
persistent to bookie
- trace the producer or consumer closed
- Get the timestamp and size for entry that has been consumed and
acked(which is already supported by BrokerInterceptor)
API Changes

interceptor message before persistent to bookie to get the timestamp and
size

/**
    * Intercept after a message before persistent to bookie.
    *
    * @param headersAndPayload entry's header and payload
    * @param publishContext Publish Context
    */
   default void beforeMessagePersistent(Producer producer,
                                        ByteBuf headersAndPayload,
                                        Topic.PublishContext publishContext) {

  }

Add interfaces for a producer or consumer closed

/**
    * Called by the broker when a producer is closed.
    *
    * @param cnx     client Connection
    * @param producer Consumer object
    * @param metadata A map of metadata
    */
   default void producerClosed(ServerCnx cnx,
                               Producer producer,
                               Map<String, String> metadata) {
  }

/**
    * Called by the broker when a consumer is closed.
    *
    * @param cnx client Connection
    * @param consumer Consumer object
    * @param metadata A map of metadata
    */
   default void consumerClosed(ServerCnx cnx,
                               Consumer consumer,
                               Map<String, String> metadata) {
  }

expand the beforeSendMessage to support consumer

/**
    * Intercept messages before sending them to the consumers.
    *
    * @param subscription pulsar subscription
    * @param entry entry
    * @param ackSet entry ack bitset. it is either <tt>null</tt> or an
array of long-based bitsets.
    * @param msgMetadata message metadata. The message metadata will
be recycled after this call.
    * @param consumer consumer. Consumer which entry are sent to.
    */
   default void beforeSendMessage(Subscription subscription,
                                  Entry entry,
                                  long[] ackSet,
                                  MessageMetadata msgMetadata,
                                  Consumer consumer) {
  }

Implementation

First, change the APIs as described above.

Then implements all the new added interfaces and BrokerInterceptors.java
and BrokerInterceptorWithClassLoader .

Set all interested properties(like timestamp and msgSize) to
MessagePublishContext in beforeMessagePersistent interface before
persistent entry to bookie.

When after entry persistent to bookie, invoke
BrokerInterceptor.messageProduced() and get the properties from the
MessagePublishContext for reconciliation.

For consumption, record the interested properties before sending to
consumer by beforeSendMessage, and then at the point f message acked,
getting all the properties for reconciliation.

Re: [Discuss] [PIP-204] Extensions for BrokerInterceptor

Posted by Enrico Olivelli <eo...@gmail.com>.
Makes sense
+1

Enrico

Il giorno ven 26 ago 2022 alle ore 05:17 Aloys Zhang
<al...@apache.org> ha scritto:
>
> Thanks for your suggestion.
> `onMessagePublish` is more precise for understanding.
>
> PengHui Li <pe...@apache.org> 于2022年8月26日周五 10:30写道:
>
> > Looks good to me.
> > I just have one suggestion.
> >
> > Maybe using `onMessagePublish` is more clear than
> > `beforeMessagePersistent`.
> > `beforeMessagePersistent` it is not very intuitive to understand the
> > starting point
> > of the interception position, maybe from the managed-ledger or before call
> > bookkeeper
> > client.
> >
> > onMessagePublish is clear that the broker received a new publish message
> > from the producer.
> >
> > Thanks,
> > Penghui
> >
> > On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <al...@apache.org>
> > wrote:
> >
> > > Hi Pulsar Community,
> > >
> > > This is a PIP discussion on extending the BrokerInterceptor.
> > > Details can be found in the issue:
> > > https://github.com/apache/pulsar/issues/17267
> > >
> > > Copying the content here for convenience, any suggestions are welcome and
> > > appreciated.
> > >
> > > Motivation
> > > Currently, we have a reconciliation system that compares the message(or
> > > entry) accounts produced and consumed at the minute level to indicate
> > > whether all the data has been consumed completely by the consumers.
> > > We want to track the message(or entry) properties (including the
> > timestamp
> > > and msgSize) for messages has been persistent to bookie and messages have
> > > been consumed & acked. Also, we want to track the event of the producer
> > and
> > > consumer closed.
> > > A good way to achieve this is using the BrokerInterceptor to interceptor
> > > the message at certain points. So we want to expand some interfaces for
> > > BrokerInterceptor like what #12858 did.
> > > Goal
> > > - Get timestamp and size(or other properties) for the entry which has
> > been
> > > persistent to bookie
> > > - trace the producer or consumer closed
> > > - Get the timestamp and size for entry that has been consumed and
> > > acked(which is already supported by BrokerInterceptor)
> > > API Changes
> > >
> > > interceptor message before persistent to bookie to get the timestamp and
> > > size
> > >
> > > /**
> > >     * Intercept after a message before persistent to bookie.
> > >     *
> > >     * @param headersAndPayload entry's header and payload
> > >     * @param publishContext Publish Context
> > >     */
> > >    default void beforeMessagePersistent(Producer producer,
> > >                                         ByteBuf headersAndPayload,
> > >                                         Topic.PublishContext
> > > publishContext) {
> > >
> > >   }
> > >
> > > Add interfaces for a producer or consumer closed
> > >
> > > /**
> > >     * Called by the broker when a producer is closed.
> > >     *
> > >     * @param cnx     client Connection
> > >     * @param producer Consumer object
> > >     * @param metadata A map of metadata
> > >     */
> > >    default void producerClosed(ServerCnx cnx,
> > >                                Producer producer,
> > >                                Map<String, String> metadata) {
> > >   }
> > >
> > > /**
> > >     * Called by the broker when a consumer is closed.
> > >     *
> > >     * @param cnx client Connection
> > >     * @param consumer Consumer object
> > >     * @param metadata A map of metadata
> > >     */
> > >    default void consumerClosed(ServerCnx cnx,
> > >                                Consumer consumer,
> > >                                Map<String, String> metadata) {
> > >   }
> > >
> > > expand the beforeSendMessage to support consumer
> > >
> > > /**
> > >     * Intercept messages before sending them to the consumers.
> > >     *
> > >     * @param subscription pulsar subscription
> > >     * @param entry entry
> > >     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an
> > > array of long-based bitsets.
> > >     * @param msgMetadata message metadata. The message metadata will
> > > be recycled after this call.
> > >     * @param consumer consumer. Consumer which entry are sent to.
> > >     */
> > >    default void beforeSendMessage(Subscription subscription,
> > >                                   Entry entry,
> > >                                   long[] ackSet,
> > >                                   MessageMetadata msgMetadata,
> > >                                   Consumer consumer) {
> > >   }
> > >
> > > Implementation
> > >
> > > First, change the APIs as described above.
> > >
> > > Then implements all the new added interfaces and BrokerInterceptors.java
> > > and BrokerInterceptorWithClassLoader .
> > >
> > > Set all interested properties(like timestamp and msgSize) to
> > > MessagePublishContext in beforeMessagePersistent interface before
> > > persistent entry to bookie.
> > >
> > > When after entry persistent to bookie, invoke
> > > BrokerInterceptor.messageProduced() and get the properties from the
> > > MessagePublishContext for reconciliation.
> > >
> > > For consumption, record the interested properties before sending to
> > > consumer by beforeSendMessage, and then at the point f message acked,
> > > getting all the properties for reconciliation.
> > >
> >

Re: [Discuss] [PIP-204] Extensions for BrokerInterceptor

Posted by Aloys Zhang <al...@apache.org>.
Thanks for your suggestion.
`onMessagePublish` is more precise for understanding.

PengHui Li <pe...@apache.org> 于2022年8月26日周五 10:30写道:

> Looks good to me.
> I just have one suggestion.
>
> Maybe using `onMessagePublish` is more clear than
> `beforeMessagePersistent`.
> `beforeMessagePersistent` it is not very intuitive to understand the
> starting point
> of the interception position, maybe from the managed-ledger or before call
> bookkeeper
> client.
>
> onMessagePublish is clear that the broker received a new publish message
> from the producer.
>
> Thanks,
> Penghui
>
> On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <al...@apache.org>
> wrote:
>
> > Hi Pulsar Community,
> >
> > This is a PIP discussion on extending the BrokerInterceptor.
> > Details can be found in the issue:
> > https://github.com/apache/pulsar/issues/17267
> >
> > Copying the content here for convenience, any suggestions are welcome and
> > appreciated.
> >
> > Motivation
> > Currently, we have a reconciliation system that compares the message(or
> > entry) accounts produced and consumed at the minute level to indicate
> > whether all the data has been consumed completely by the consumers.
> > We want to track the message(or entry) properties (including the
> timestamp
> > and msgSize) for messages has been persistent to bookie and messages have
> > been consumed & acked. Also, we want to track the event of the producer
> and
> > consumer closed.
> > A good way to achieve this is using the BrokerInterceptor to interceptor
> > the message at certain points. So we want to expand some interfaces for
> > BrokerInterceptor like what #12858 did.
> > Goal
> > - Get timestamp and size(or other properties) for the entry which has
> been
> > persistent to bookie
> > - trace the producer or consumer closed
> > - Get the timestamp and size for entry that has been consumed and
> > acked(which is already supported by BrokerInterceptor)
> > API Changes
> >
> > interceptor message before persistent to bookie to get the timestamp and
> > size
> >
> > /**
> >     * Intercept after a message before persistent to bookie.
> >     *
> >     * @param headersAndPayload entry's header and payload
> >     * @param publishContext Publish Context
> >     */
> >    default void beforeMessagePersistent(Producer producer,
> >                                         ByteBuf headersAndPayload,
> >                                         Topic.PublishContext
> > publishContext) {
> >
> >   }
> >
> > Add interfaces for a producer or consumer closed
> >
> > /**
> >     * Called by the broker when a producer is closed.
> >     *
> >     * @param cnx     client Connection
> >     * @param producer Consumer object
> >     * @param metadata A map of metadata
> >     */
> >    default void producerClosed(ServerCnx cnx,
> >                                Producer producer,
> >                                Map<String, String> metadata) {
> >   }
> >
> > /**
> >     * Called by the broker when a consumer is closed.
> >     *
> >     * @param cnx client Connection
> >     * @param consumer Consumer object
> >     * @param metadata A map of metadata
> >     */
> >    default void consumerClosed(ServerCnx cnx,
> >                                Consumer consumer,
> >                                Map<String, String> metadata) {
> >   }
> >
> > expand the beforeSendMessage to support consumer
> >
> > /**
> >     * Intercept messages before sending them to the consumers.
> >     *
> >     * @param subscription pulsar subscription
> >     * @param entry entry
> >     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an
> > array of long-based bitsets.
> >     * @param msgMetadata message metadata. The message metadata will
> > be recycled after this call.
> >     * @param consumer consumer. Consumer which entry are sent to.
> >     */
> >    default void beforeSendMessage(Subscription subscription,
> >                                   Entry entry,
> >                                   long[] ackSet,
> >                                   MessageMetadata msgMetadata,
> >                                   Consumer consumer) {
> >   }
> >
> > Implementation
> >
> > First, change the APIs as described above.
> >
> > Then implements all the new added interfaces and BrokerInterceptors.java
> > and BrokerInterceptorWithClassLoader .
> >
> > Set all interested properties(like timestamp and msgSize) to
> > MessagePublishContext in beforeMessagePersistent interface before
> > persistent entry to bookie.
> >
> > When after entry persistent to bookie, invoke
> > BrokerInterceptor.messageProduced() and get the properties from the
> > MessagePublishContext for reconciliation.
> >
> > For consumption, record the interested properties before sending to
> > consumer by beforeSendMessage, and then at the point f message acked,
> > getting all the properties for reconciliation.
> >
>

Re: [Discuss] [PIP-204] Extensions for BrokerInterceptor

Posted by PengHui Li <pe...@apache.org>.
Looks good to me.
I just have one suggestion.

Maybe using `onMessagePublish` is more clear than `beforeMessagePersistent`.
`beforeMessagePersistent` it is not very intuitive to understand the
starting point
of the interception position, maybe from the managed-ledger or before call
bookkeeper
client.

onMessagePublish is clear that the broker received a new publish message
from the producer.

Thanks,
Penghui

On Thu, Aug 25, 2022 at 10:38 PM Aloys Zhang <al...@apache.org> wrote:

> Hi Pulsar Community,
>
> This is a PIP discussion on extending the BrokerInterceptor.
> Details can be found in the issue:
> https://github.com/apache/pulsar/issues/17267
>
> Copying the content here for convenience, any suggestions are welcome and
> appreciated.
>
> Motivation
> Currently, we have a reconciliation system that compares the message(or
> entry) accounts produced and consumed at the minute level to indicate
> whether all the data has been consumed completely by the consumers.
> We want to track the message(or entry) properties (including the timestamp
> and msgSize) for messages has been persistent to bookie and messages have
> been consumed & acked. Also, we want to track the event of the producer and
> consumer closed.
> A good way to achieve this is using the BrokerInterceptor to interceptor
> the message at certain points. So we want to expand some interfaces for
> BrokerInterceptor like what #12858 did.
> Goal
> - Get timestamp and size(or other properties) for the entry which has been
> persistent to bookie
> - trace the producer or consumer closed
> - Get the timestamp and size for entry that has been consumed and
> acked(which is already supported by BrokerInterceptor)
> API Changes
>
> interceptor message before persistent to bookie to get the timestamp and
> size
>
> /**
>     * Intercept after a message before persistent to bookie.
>     *
>     * @param headersAndPayload entry's header and payload
>     * @param publishContext Publish Context
>     */
>    default void beforeMessagePersistent(Producer producer,
>                                         ByteBuf headersAndPayload,
>                                         Topic.PublishContext
> publishContext) {
>
>   }
>
> Add interfaces for a producer or consumer closed
>
> /**
>     * Called by the broker when a producer is closed.
>     *
>     * @param cnx     client Connection
>     * @param producer Consumer object
>     * @param metadata A map of metadata
>     */
>    default void producerClosed(ServerCnx cnx,
>                                Producer producer,
>                                Map<String, String> metadata) {
>   }
>
> /**
>     * Called by the broker when a consumer is closed.
>     *
>     * @param cnx client Connection
>     * @param consumer Consumer object
>     * @param metadata A map of metadata
>     */
>    default void consumerClosed(ServerCnx cnx,
>                                Consumer consumer,
>                                Map<String, String> metadata) {
>   }
>
> expand the beforeSendMessage to support consumer
>
> /**
>     * Intercept messages before sending them to the consumers.
>     *
>     * @param subscription pulsar subscription
>     * @param entry entry
>     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an
> array of long-based bitsets.
>     * @param msgMetadata message metadata. The message metadata will
> be recycled after this call.
>     * @param consumer consumer. Consumer which entry are sent to.
>     */
>    default void beforeSendMessage(Subscription subscription,
>                                   Entry entry,
>                                   long[] ackSet,
>                                   MessageMetadata msgMetadata,
>                                   Consumer consumer) {
>   }
>
> Implementation
>
> First, change the APIs as described above.
>
> Then implements all the new added interfaces and BrokerInterceptors.java
> and BrokerInterceptorWithClassLoader .
>
> Set all interested properties(like timestamp and msgSize) to
> MessagePublishContext in beforeMessagePersistent interface before
> persistent entry to bookie.
>
> When after entry persistent to bookie, invoke
> BrokerInterceptor.messageProduced() and get the properties from the
> MessagePublishContext for reconciliation.
>
> For consumption, record the interested properties before sending to
> consumer by beforeSendMessage, and then at the point f message acked,
> getting all the properties for reconciliation.
>