You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/25 02:52:18 UTC

[GitHub] [pulsar] aloyszhang opened a new issue, #17267: Extensions for broker interceptor

aloyszhang opened a new issue, #17267:
URL: https://github.com/apache/pulsar/issues/17267

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar.
   
   
   ### Motivation
   
   Currently, we have a reconciliation system that compares the message(or entry) accounts at the minutes level to indicate whether all the data has been consumed by the consumers. We need to trace the message before persistent to Bookie(including the timestamp and msgSize) and also the event of the producer or consumer closed.  
   
   A good way to achieve this is using the `BrokerInterceptor ` to interceptor the message at certain points.  We want to expand some interfaces for `BrokerInterceptor ` like what https://github.com/apache/pulsar/issues/12858  has done.
   
   
   ### Solution
   
   ### 1.  interceptor message before persistent to bookie
   
   ```java
   /**
        * Intercept after a message before persistent to bookie.
        *
        * @param headersAndPayload entry's header and payload
        * @param publishContext Publish Context
        */
       default void beforeMessagePersistent(Producer producer,
                                            ByteBuf headersAndPayload,
                                            Topic.PublishContext publishContext) {
   
       }
   ```
   
   ### 2. Add interfaces for producer or consumer closed:
   ```java
   /**
        * Called by the broker when a producer is closed.
        *
        * @param cnx      client Connection
        * @param producer Consumer object
        * @param metadata A map of metadata
        */
       default void producerClosed(ServerCnx cnx,
                                   Producer producer,
                                   Map<String, String> metadata) {
       }
   
    /**
        *  Called by the broker when a consumer is closed.
        *
        * @param cnx client Connection
        * @param consumer Consumer object
        * @param metadata A map of metadata
        */
       default void consumerClosed(ServerCnx cnx,
                                   Consumer consumer,
                                   Map<String, String> metadata) {
       }
   ```
   ### 3.  expand the `beforeSendMessage ` to  support consumer
   ```java
   /**
        * Intercept messages before sending them to the consumers.
        *
        * @param subscription pulsar subscription
        * @param entry entry
        * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
        * @param msgMetadata message metadata. The message metadata will be recycled after this call.
        * @param consumer consumer. Consumer which entry are sent to.
        */
       default void beforeSendMessage(Subscription subscription,
                                      Entry entry,
                                      long[] ackSet,
                                      MessageMetadata msgMetadata,
                                      Consumer consumer) {
       }
   ```
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed issue #17267: PIP-204 Extensions for broker interceptor

Posted by GitBox <gi...@apache.org>.
codelipenghui closed issue #17267: PIP-204 Extensions for broker interceptor
URL: https://github.com/apache/pulsar/issues/17267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on issue #17267: PIP-204 Extensions for broker interceptor

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on issue #17267:
URL: https://github.com/apache/pulsar/issues/17267#issuecomment-1232366134

   PIP VOTE thead : https://lists.apache.org/thread/ckxqgoyjkqgbp6szn2vh9ynzbsjxm3yy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on issue #17267: PIP-204 Extensions for broker interceptor

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on issue #17267:
URL: https://github.com/apache/pulsar/issues/17267#issuecomment-1227971252

   PIP discuss thread : https://lists.apache.org/thread/3zgpbxffo7gzsb5mdh0sk2rq27bgw2bo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org