You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/18 07:27:35 UTC

[GitHub] [pulsar] BewareMyPower opened a new issue #12087: PIP 96: Payload converter for Pulsar client

BewareMyPower opened a new issue #12087:
URL: https://github.com/apache/pulsar/issues/12087


   ## Motivation
   
   The initial motivation was from Kafka's protocol handler, i.e. KoP (https://github.com/streamnative/kop). KoP allows Kafka producer to configure entry format to `kafka`, which means the message from Kafka producer can be written to bookies directly without any conversion between Kafka's message format and Pulsar's message format. This improves the performance significantly. However, it also introduced the limit that Pulsar consumer cannot consume this topic because it cannot recognize the entry's format.
   
   The existing `ConsumerInterceptor` cannot work for this case because it's based on a `Message<T>` object that is converted from an entry with valid Pulsar format. 
   
   ## Goal
   
   This proposal tries to introduce a payload converter for Pulsar client, which performs a conversion for payload (without metadata parts) at client side. After that, Pulsar client would recognize an entry of any format once a suitable converter was configured.
   
   This proposal is mainly for protocol handlers because they can access `PersistentTopic` and write bytes to bookies directly. In a rare case, if users want to write something to the topic's ledger directly by BookKeeper client, the converter can also handle the case.
   
   ## API Changes
   
   First an interface is added under package `org.apache.pulsar.common.api.raw`.
   
   ```java
   public interface PayloadConverter {
   
       /**
        * Convert the buffer to the format that Pulsar consumer can recognize.
        *
        * @param brokerEntryMetadata the {@link BrokerEntryMetadata} object that might contain the buffer's format
        * @param metadata the {@link MessageMetadata} object that might contain the buffer's format
        * @param payload the payload buffer that doesn't contain the {@link MessageMetadata} part
        * @return the converted payload buffer
        * @implNote There are some constraints for the valid implementation:
        *   1. The refCnt of `payload` must not change after this call.
        *   2. It could either return `payload` itself or a new allocated buffer whose refCnt is 1.
        *   3. If it returns `payload`, the reader index and writer index must keep unchanged.
        */
       ByteBuf convert(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata metadata, ByteBuf payload);
   }
   ```
   
   The metadata parts, including `BrokerEntryMetadata` and `MessageMetadata`, determine the format of payload. These metadata types are necessary because the current consumer implementation parsed them first for subsequent operations, see
   
   https://github.com/apache/pulsar/blob/5fd62a9847a4f574708e3002482aaf876de8d540/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1011-L1019
   
   Since pulsar-client-api module should not bring any dependency to avoid cyclic dependency, this proposal adopted a way that loads `PayloadConverter` at runtime, like what SLF4J API does. We only need to configure whether it's enabled and the optional package. See following methods in `ConsumerBuilder`.
   
   ```java
       /**
        * Enable the payload converter to convert message payload before parsing it.
        *
        * Default: false
        *
        * If it's enabled, each time a consumer is created, it will try to load classes under a specific package. The first
        * class that implements PayloadConverter will be used as the converter instance's type. If no PayloadConverter is
        * found, the default converter is a trivial converter that returns payload itself.
        *
        * The converter instance is constructed with no argument.
        *
        * See PayloadConverter interface's definition from org.apache.pulsar.common.api.raw package of
        * org.apache.pulsar:pulsar-common artifact.
        *
        * @see ConsumerBuilder#payloadConverterPackage(String)
        *
        * @param enablePayloadConverter
        * @return
        */
       ConsumerBuilder<T> enablePayloadConverter(boolean enablePayloadConverter);
   
       /**
        * Set the package that might con
        *
        * Default: org.apache.pulsar.client.converter
        *
        * @param packageName
        * @return
        */
       ConsumerBuilder<T> payloadConverterPackage(String packageName);
   ```
   
   In conclusion, we should follow these steps to use a payload converter:
   
   1. Implement `PayloadConverter` under a specific package like `org.apache.pulsar.client.converter`.
   2. Configure `enablePayloadConverter` at client side and `payloadConverterPackage` if the converter's package is not `org.apache.pulsar.client.converter`.
   3. Add the compiled class that implements `PayloadConverter` to the class path.
   
   ## Implementation
   
   Add a `PayloadConverter` field in `ConsumerBase` and initialize it when a consumer is created. The initialization process is:
   
   1. Find all classes that implement `PayloadConverter` interface under the configured package.
   2. If no classes were found, use a the default converter that returns the payload simply.
   3. Otherwise, choose the first class as the actual converter.
   
   Once the converter is initialized, after the `BrokerEntryMetadata` and `MessageMetadata` objects are parsed from the original buffer, apply the converter to generate a new payload buffer for the subsequent processes.
   
   It should be noted that the original buffer will be released in `PulsarDecoder#handleRead`. However, if the `convert` method returns a new buffer, we need to release it.
   
   Regarding to the tests, they should cover following cases:
   
   - Consumers for a non-partitioned topic or multiple topics both work.
   - The accepted `BrokerEntryMetadata` or `MessageMetadata` are the actual values.
   - The converters don't affect the default behavior if `enablePayloadConverter` is false.
   
   ## Reject Alternatives
   
   There was a proposal that was discarded before. It tried to perform the conversion at the broker side before dispatching entries. However, the dispatcher run in a single thread when dispatching entries. The conversion cost might be affect all other topics.
   
   Therefore, we should do it at the client side. It could bring overhead for Pulsar consumer but doesn't affect other topics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930697502


   The proposal LGTM, thanks @eolivelli @BewareMyPower for the great context and discussion. I only have one point about the name `EntryContext`, because you know the `Entry` is the concept from BookKeeper and currently we don't have any concepts about the entry at the client-side, so maybe `BatchContext` is more reasonable here? Greate proposal! @BewareMyPower


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925832435


   @eolivelli 
   
   See this commit: https://github.com/BewareMyPower/pulsar/commit/e51486357e42af73a70a5ea697e64c67914f9922
   
   - DefaultPayloadConverter is an implementation for the default case. It uses `newSingleMessage` to create each single message of a batch.
   - The actual implementation is in `ConsumerImpl#newSingleMessage`, which is simply called in `MessageContextImpl#newSingleMessage`.
   - See the removed code in `ConsumerImpl#messageReceived` for comparison.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925832435


   @eolivelli 
   
   See this commit: https://github.com/BewareMyPower/pulsar/commit/e51486357e42af73a70a5ea697e64c67914f9922
   
   - `DefaultPayloadConverter` is an implementation for the default case. It uses `newSingleMessage` to create each single message of a batch.
   - The actual implementation is in `ConsumerImpl#newSingleMessage`, which is simply called in `MessageContextImpl#newSingleMessage`.
   - See the removed code in `ConsumerImpl#messageReceived` for comparison.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927003655


   I have simplified this design and update the description, PTAL again. @eolivelli 
   
   Some changes:
   1. Rename `MessageContext` to `EntryContext` to avoid misunderstanding. Because `MessageContext` looks like a context of `Message<T>`.
   2. Simplify the `MessagePayload` interface. It's only responsible to convert itself to `ByteBuf` now.
   3. Don't apply a default converter for consumer.
   
   You can also see https://github.com/apache/pulsar/pull/12088 for detail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922444202


   Now I understand better the problem.
   Let me think more about it....
   
   My first thought is that if we want this to be part on the client supported API then we must explicitly link to those APIs.
   
   Maybe in some new Pulsar client API extensions package ?
   
   Otherwise we risk to fail to support compatibility for future versions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922910119


   > if I didn't understand wrong, I think it's a part of converter's implementation. 
   Sorry, I didn't check the implementation (this is expected, because if I take a look to the implementation we could miss some details here in the document).
   So the example implementation simply skips messages that are not handled by the Converter.
   
   I see only one problem with this approach: if the Consumer does not load the Converter the application will receive some corrupted data, because it fails to decode it.
   
   If we explicitly add a metadata field "messageFormat" to the metadata, then the PulsarClient can:
   1) lookup the Converter, given the format
   2a) fail: report an error to the user and prevent the application to process garbage
   2b) process the conversion
   
   We can do this in two ways:
   - have a dictionary inside the Pulsar Client that maps the MessageConverted to a format
   - add a "acceptFormat(String format)" to the MessageConverted interface (and let the Pulsar Client fail the read if no MessageConverted accepted the format)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922490024


   Another thought about this PIP, thinking out loud:
   The selection  of the decidere should be automatic, depending on some metadata in the message, the same way we do with compression (probably compression could be made pluggable one day when  we solve the problem for the payload converter...)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930172959


   Since the code needs a lot of changes, I updated the proposal first. PTAL @eolivelli 
   
   In addition to your suggestion, I added a `Consumer<Throwable>` parameter to handle the exceptional case, in this case, Pulsar consumer will discard the message and send the `CommandAck` with a `ValidationError.BatchDeSerializeError`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930048387






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930004829


   Update: `PayloadConverter#whenInterrupted()` method was added according to @eolivelli 's suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925856254


   In addition, actually it only affect the client side that usually consumes not many topics, the overhead of iterator allocations might not be significant IMO. But it still need tests to prove that. I think we can skip the converter for the default case as you've said. @eolivelli Though there will still be some refactors to `ConsumerImpl`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930299033


   catching `Throwable` is not good, we should not ask to the PayloadProcessor developer to do this.
   
   Why not change the signature to 
   ```
   public <T> void process(MessagePayload payload,
                                   EntryContext context,
                                   Schema<T> schema,
                                   Consumer<Message<T>> messageConsumer) throws Exception
   ```
   
   then we deal with catching any Throwable in Pulsar code ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-928669196


   I updated the PIP now, PTAL @eolivelli .
   
   The methods are renamed. But I don't pass the `EntryContext` object  to `PayloadConverter#afterConnect`. The reason is similar as I've said before, the entry context might not be the actual message context. We should record the actual message context in `PayloadConverter` implementation.
   
   Here's an example, see [CustomBatchConverter](https://github.com/apache/pulsar/pull/12088/files#diff-4af2bb02ccc23bc94f852308aa50284cdc9aa521f9db49cff94963625fa1c02a). The number of messages is retrieved from payload, not the entry context.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-923534386


   I think our goal is to configure multiple converters and choose the proper one for the message. Since I missed the case in this PIP, I merged them into one. You're right that the `accept` method should be separated.
   
   > If we explicitly add a metadata field "messageFormat" to the metadata
   
   But existing messages don't have this field, for example, KoP adds the property `entry.format=kafka` to mark messages as Kafka format. I think the detail steps should be:
   
   1. Check the `format` field of metadata
   2. If it's set, lookup the converter according to the `format` field
       1. fail: report an error to the user and prevent the application to process garbage
       2. process the conversion
   3. Otherwise, iterate over all converters to find the first converter that accepts the message.
       1. If no converter was found, fallback to the default converter that returns the payload simply.
       2. Process the conversion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-929315736


   The current design of `MessagePayload` might still looks a little weird, especially for the `copiedBuffer()` method. I might make some changes for `MessagePayload` tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927917443


   Regarding `default void cleanup() {}`
   
   The word 'cleanup' reads like something that is to be performed to "clean" after some operation or in the end.
   But if I understand correctly it is unrelated to any "convert" operation.
   
   I believe that we should pick a better name:
   1) afterConvert(EntryContext....) : if we want to call it after every "convert"
   2) close(): if we want to call it when shutting down the Consumer
   3) update(): if we want to call it sometimes, without any constraint
   
   I like 1) and 2) (we can do both), because from a developer perspective it is clear "when" the method is called.
   Option 3) does not guarantee that the method will be ever called, so it would be good only to update some metrics as in your example
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927912570


   @eolivelli Thanks for your feedback! Here're my answers.
   
   > `default void cleanup() {}`
   > It is not clear to me when this method will be called, when we are closing the `Consumer` ?
   > Because it has not reference to the `EntryContext`, so it is not related to `convert`
   
   Since `PayloadConverter#convert` returns a iterable messages, it's called in this way in `ConsumerImpl`:
   
   ```java
   for (Message<T> msg : converter.convert(entryContext, payload, schema)) {
       // Do something with msg
   }
   converter.cleanup();
   ```
   
   For example, if we want to collect the total bytes of each conversion and expose it to outside, we might implement the converter as
   
   ```java
       private long totalBytes = 0;
   
       @Override
       public <T> Iterable<Message<T>> convert(EntryContext context, MessagePayload payload, Schema<T> schema) {
           /* ... */
           return () -> new Iterator<Message<T>>() {
               /* ... */
   
               @Override
               public Message<T> next() {
                   final Message<T> msg = isBatch
                           ? context.newSingleMessage(index, numMessages, payload, true, schema)
                           : context.newMessage(payload, schema);
                   totalBytes += msg.getData().length;
                   return msg;
               }
           };
       }
   
       @Override
       public void cleanup() {
           recordStatistics(totalBytes);  // pseudo code
       }
   ```
   
   I'm not sure whether there's a better name.
   
   Regarding to `newSingleMessage` API design, you're right that `numMessages` parameter should be removed.
   
   > if EntryContext is a wrapper for an Entry maybe we can call it EntryWrapper
   No, it's only a wrapper for Entry's metadata, not including the payload. So I think the name `Context` is better.
   
   And the key point of the latest design is to **avoid building the batch**. Because if we built a batch using the converter, in `ConsumerImpl` needs to deserialize the batch into multiple single messages. It's unnecessary.
   
   1. If the entry is a batch, call `newSingleMessage` in each iteration.
   2. Otherwise, call `newMessage` to convert the entry payload to a message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925853307


   > if (I have a converter) .... run the new code
   > else .... run the optimized existing code without the the Converter ?
   
   I think yes. Maybe the code will look ugly. I'll try this way soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922748959


   @BewareMyPower 
   I think that we have to think more in general about these extensibility points that we add to Pulsar, especially on the client.
   
   On one hand we need the power to access the internals of Pulsar (and use internal APIs, Netty....)
   On the other hand we must provide to the end user something that:
   - it is able to deal with backward/forward compatibility
   - it does not add additional dependencies on the client (pulsar-common??)
   
   I believe that those things can only be implemented if we create a public API and maintain it, so we have to create public interfaces in pulsar-client-api module that represent the internal components that we want to manage.
   
   For instance we cannot expose Netty here, because in that case clients that use a Shaded version of Netty will not work.
   We do not want to access Broker Message Metadata....
   
   so we need something like:
   
   `ByteBuffer convert(MessageContext message, ByteBuffer content)` -> No good, ByteBuffer does not fully represent ByteBuf
    or
   `byte[] convert(MessageContext message, byte[] content) `-> very bad, everybody knows...
    or
   `Payload convert(MessageContext message, Payload payload)`
        
    with
    ```
   interface MessageContext {
                 ... getMetadata(....)   
         }
   ```
    and a high level wrapper
    ```
     interface Payload {
           int size()
           getInputStream()
           copyTo(....)
           static Payload wrap(byte[])
           static Payload wrap(ByteBuffer....)
           ...add whatever is needed...
         }
   ```
         
         
         
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927893303


   I am still not sure about this:
   ```
   <T> Message<T> newSingleMessage(int index,
                                       int numMessages,
                                       MessagePayload payload,
                                       boolean containMetadata,
                                       Schema<T> schema);
   ```
   
   problems:
   - we are passing `numMessages` for each message, and so we can pass different values. My understanding is that you should use the same value for each message in the new batch
   - it is not clear if we can create a new batch that has a different number of messages then the original batch
   - why we have to call "newSingleMessage" on `EntryContext` is the `EntryContext` a wrapper for the original Message (Entry) on Pulsar ? 
   
   if `EntryContext` is a wrapper for an `Entry` maybe we can call it `EntryWrapper`
   if it adds more functionalities to a pure wrapper, can we have two objects ?
   - `EntryWrapper` -> contains methods to access the original Entry and the Message in the batch
   - `ConverterContext` -> contains utility methods to build Messages 
   
   ```
   interface ConverterContext {
          <T> BatchBuilder<T> newBatch(int numberOfMessages, Schema<T> schema);
          <T> Message<T> newMessage(MessagePayload payload, Schema<T> schema);
     }
   ```
     
   ```
   interface BatchBuilder<T> {
            Message<T> newMessage(int index, MessagePayload payload....) // not sure if we need to pass index, we can compute it automatically probably
            Iterator<Message<T>> buildMessages();
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930686700


   Makes sense. I updated the proposal again. PTAL @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-929967313


   I added a `MessagePayloadFactory` interface with a `DEFAULT` instance to avoid users using `MessagePayloadImpl` directly. Regarding to the `BatchMessageAcker` issue, I added the `acknowledge` and `acknowledgeCumulative` invocations in tests. It might not affect the proposal itself, and should be discussed in PR review phase.
   
   /cc @codelipenghui @hangc0276 @315157973 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927912570


   @eolivelli Thanks for your feedback! Here're my answers.
   
   > `default void cleanup() {}`
   > It is not clear to me when this method will be called, when we are closing the `Consumer` ?
   > Because it has not reference to the `EntryContext`, so it is not related to `convert`
   
   Since `PayloadConverter#convert` returns a iterable messages, it's called in this way in `ConsumerImpl`:
   
   ```java
   for (Message<T> msg : converter.convert(entryContext, payload, schema)) {
       // Do something with msg
   }
   converter.cleanup();
   ```
   
   For example, if we want to collect the total bytes of each conversion and expose it to outside, we might implement the converter as
   
   ```java
       private long totalBytes = 0;
   
       @Override
       public <T> Iterable<Message<T>> convert(EntryContext context, MessagePayload payload, Schema<T> schema) {
           /* ... */
           return () -> new Iterator<Message<T>>() {
               /* ... */
   
               @Override
               public Message<T> next() {
                   final Message<T> msg = isBatch
                           ? context.newSingleMessage(index, numMessages, payload, true, schema)
                           : context.newMessage(payload, schema);
                   totalBytes += msg.getData().length;
                   return msg;
               }
           };
       }
   
       @Override
       public void cleanup() {
           recordStatistics(totalBytes);  // pseudo code
       }
   ```
   
   I'm not sure whether there's a better name.
   
   Regarding to `newSingleMessage` API design, you're right that `numMessages` parameter should be removed.
   
   > if EntryContext is a wrapper for an Entry maybe we can call it EntryWrapper
   
   No, it's only a wrapper for Entry's metadata, not including the payload. So I think the name `Context` is better.
   
   And the key point of the latest design is to **avoid building the batch**. Because if we built a batch using the converter, in `ConsumerImpl`, it still needs to deserialize the batch into multiple single messages. The `payload -> batch -> messages` conversion is unnecessary. This PR perfers `payload -> Iterable<Message>`, and the `Iterable` maintains a latest progress internally.
   
   1. If the entry is a batch, call `newSingleMessage` in each iteration.
   2. Otherwise, call `newMessage` to convert the entry payload to a message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-928651728






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930733199


   I thought for a while, `PayloadContext` is more clear. My previous concerns can be solved by the document.
   
   - MessagePayloadContext: the context of a message payload
   - MessagePayload: the message payload itself
   - MessagePayloadProcessor: the processor for a message payload
   
   It looks more consistent. BTW, I added the `Message` prefix to context and processor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925839190


   from your example it looks like newSingleMessage is useful to extract a Message from a batch.
   how can you apply this to KOP ?
   
   do we need to refactor  ConsumerImpl ?
   I though that this would be a simpler change, but in fact you have to deal with the fact that the Converter may decide to split the single message into multiple messages.
   
   I am not sure if we should have a `DefaultConverter`.
   if you really need it please make it more evident in the PIP description.
   
   I am not sure about the overhead we are introducing with the DefaultConverter.
   isn't it possible to do like:
   
   if (I have a converter) .... run the new code
   else .... run the optimized existing code without the the Converter ?
   
   we will avoid many allocations and using many CPU cycles
   
   cc @merlimat 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925832435


   @eolivelli 
   
   See this commit: https://github.com/BewareMyPower/pulsar/commit/e51486357e42af73a70a5ea697e64c67914f9922
   
   - `DefaultPayloadConverter` is an implementation for the default case. It uses `newSingleMessage` to create each single message of a batch.
   - The actual implementation is in `ConsumerImpl#newSingleMessage`, which is simply called in `MessageContextImpl#newSingleMessage`.
   - See the removed code in `ConsumerImpl#messageReceived` for comparison.
   
   BTW, I'm working on the tests now, it will have more examples for the converter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925850421


   @eolivelli 
   
   > how can you apply this to KOP ?
   Each entry produced by KoP is a buffer that can be converted to a `MemoryRecords`.
   
   ```java
       public static MemoryRecords readableRecords(ByteBuffer buffer) {
           return new MemoryRecords(buffer);
       }
   ```
   
   Then we can reuse the method of `MemoryRecords` to create iterable records
   
   ```java
       public Iterable<Record> records() {
           return records;
       }
   ```
   
   Each `Record` is a single message, i.e. `MessageImpl<T>`.
   
   > I though that this would be a simpler change, but in fact you have to deal with the fact that the Converter may decide to split the single message into multiple messages.
   
   No, the converter did the same as the code before. The buffer is an entry but not a single message.
   
   The default converter's behavior is:
   1. If the entry is a single message, use `newMessage` to create a `Iterable<Message<T>>` that can only iterate once because it only contains one message.
   2. If the entry is a batched message, create a `Iterable<Message<T>>`, during each iteration, use `newSingleMessage` to create single messages.
   
   The difference is, the previous implementation uses a for loop, the current implementation creates a iterator. The overhead is the allocation of the iterator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922449420


   @eolivelli 
   
   > Otherwise we risk to fail to support compatibility for future versions.
   
   I don't understand well for the compatibility issue. The interface is a part of **pulsar-common** package. If there're no compatibility changes for `BrokerEntryMetadata` and `MessageMetadata`, it should be stable.
   
   ----
   
   Actually there's a similar API. When creating a producer, we can configure a `BatcherBuilder` interface.
   
   https://github.com/apache/pulsar/blob/fccc1cf60467bc0600c6507271cff85146d9f9e8/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatcherBuilder.java#L56-L60
   
   However, this `build` method is meaningless. The actual type is `BatchMessageContainer`. However, the actual required type is `BatchMessageContainerBase`.
   
   https://github.com/apache/pulsar/blob/fccc1cf60467bc0600c6507271cff85146d9f9e8/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L235-L236
   
   `BatchMessageContainerBase` needs to depend `OpSendMsg`, `ProducerImpl`, etc. So it cannot be a part of **pulsar-client-api** package. `BaseMessageContainer` is just a fake interface.
   
   We can also take the similar way for `PayloadConverter` but it's not better than loading class at runtime IMO.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922405741


   @eolivelli `ConsumerBuilder` is a class in **pulsar-client-api** module that has no dependency. If **pulsar-common** was added as the dependency of **pulsar-client-api** because it will cause cyclic dependency.
   
   ```
   org.apache.pulsar:pulsar-common:jar:2.9.0-SNAPSHOT
   +- org.apache.pulsar:pulsar-client-api:jar:2.9.0-SNAPSHOT:compile
   ```
   
   Also we can't put `PayloadConverter` into **pulsar-client-api** package because we must import `BrokerEntryMetadata` and `MessageMetadata` from **pulsar-common** package.
   
   If we changed the `PayloadConverter` interface to
   
   ```java
   interface PayloadConverter {
       ByteBuf convert(ByteBuf headerAndPayload);
   }
   ```
   
   Maybe it could work, but we need to parse `BrokerEntryMetadata` and `MessageMetadata` repeatedly in all valid implementations. BTW, we need to import netty dependency for `ByteBuf`. I'm not sure if it's proper for **pulsar-client-api** package to do that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925850421


   @eolivelli 
   
   > how can you apply this to KOP ?
   
   Each entry produced by KoP is a buffer that can be converted to a `MemoryRecords`.
   
   ```java
       public static MemoryRecords readableRecords(ByteBuffer buffer) {
           return new MemoryRecords(buffer);
       }
   ```
   
   Then we can reuse the method of `MemoryRecords` to create iterable records
   
   ```java
       public Iterable<Record> records() {
           return records;
       }
   ```
   
   Each `Record` is a single message, i.e. `MessageImpl<T>`.
   
   > I though that this would be a simpler change, but in fact you have to deal with the fact that the Converter may decide to split the single message into multiple messages.
   
   No, the converter did the same as the code before. The buffer is an entry but not a single message.
   
   The default converter's behavior is:
   1. If the entry is a single message, use `newMessage` to create a `Iterable<Message<T>>` that can only iterate once because it only contains one message.
   2. If the entry is a batched message, create a `Iterable<Message<T>>`, during each iteration, use `newSingleMessage` to create single messages.
   
   The difference is, the previous implementation uses a for loop, the current implementation creates a iterator. The overhead is the allocation of the iterator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927920268


   So can we name `newSingleMessage` to `getMessageAt` and `newMessage` to `asSingleMessage`
   
   ```
   EntryContext {
        int numMessages();
        Message getMessageAt(index.....)
        Message asSingleMessage()
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-928651728


   @eolivelli Regarding to the `numMessages` parameter, I found it's still necessary when I tried to remove it.
   
   The entry context is based on the `MessageMetadata`. However, the actual number of messages might be contained in the payload. For example, if the message was published from KoP, the actual metadata is contained in the payload and the `MessageMetadata` is a false header. In this case, we need to parse the payload, then pass the actual number of messages.
   
   Similarly, these methods are all based on `MessageMetadata`:
   
   ```java
   public interface EntryContext {
   
       String getProperty(String key);
   
       int getNumMessages();
   
       boolean isBatch();
   ```
   
   That's also one of the reasons that I renamed the `MessageContext` to `EntryContext`. Because the context of the entry might not be the same context of the message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927912570


   @eolivelli Thanks for your feedback! Here're my answers.
   
   > `default void cleanup() {}`
   > It is not clear to me when this method will be called, when we are closing the `Consumer` ?
   > Because it has not reference to the `EntryContext`, so it is not related to `convert`
   
   Since `PayloadConverter#convert` returns a iterable messages, it's called in this way in `ConsumerImpl`:
   
   ```java
   for (Message<T> msg : converter.convert(entryContext, payload, schema)) {
       // Do something with msg
   }
   converter.cleanup();
   ```
   
   For example, if we want to collect the total bytes of each conversion and expose it to outside, we might implement the converter as
   
   ```java
       private long totalBytes = 0;
   
       @Override
       public <T> Iterable<Message<T>> convert(EntryContext context, MessagePayload payload, Schema<T> schema) {
           /* ... */
           return () -> new Iterator<Message<T>>() {
               /* ... */
   
               @Override
               public Message<T> next() {
                   final Message<T> msg = isBatch
                           ? context.newSingleMessage(index, numMessages, payload, true, schema)
                           : context.newMessage(payload, schema);
                   totalBytes += msg.getData().length;
                   return msg;
               }
           };
       }
   
       @Override
       public void cleanup() {
           recordStatistics(totalBytes);  // pseudo code
       }
   ```
   
   I'm not sure whether there's a better name.
   
   Regarding to `newSingleMessage` API design, you're right that `numMessages` parameter should be removed.
   
   > if EntryContext is a wrapper for an Entry maybe we can call it EntryWrapper
   No, it's only a wrapper for Entry's metadata, not including the payload. So I think the name `Context` is better.
   
   And the key point of the latest design is to **avoid building the batch**. Because if we built a batch using the converter, in `ConsumerImpl`, it still needs to deserialize the batch into multiple single messages. The `payload -> batch -> messages` conversion is unnecessary. This PR perfers `payload -> Iterable<Message>`, and the `Iterable` maintains a latest progress internally.
   
   1. If the entry is a batch, call `newSingleMessage` in each iteration.
   2. Otherwise, call `newMessage` to convert the entry payload to a message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922749723


   second topic:
   the client must be able to auto detect the "format" of the payload and then apply it automatically to the messages tagged with that format, the only constraint is that in the classpath you have the proper "MessageConverter"
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925808096


   @eolivelli I've just made a lot of changes for this proposal. PTAL
   
   Some differences from the previous thoughts:
   1. Only configure one converter to avoid extra complexity.
   2. The `messageFormat` field was not added to MessageMetadata, though it doesn't take much effort to do that. Checking the property could be compatible with messages produced by older Pulsar API.
   
   Here's the branch for the latest proposal but still needs some work: https://github.com/BewareMyPower/pulsar/tree/TEST-PIP-96
   
   @codelipenghui Please also take a look since I've changed the returned value to `Iterable<Message<T>>`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930708942


   > so maybe BatchContext is more reasonable here? 
   
   I think not. Because the Entry might just be a single message, not the batch. You can also see related docs in `EntryContext`:
   
   ```
       /**
        * Check whether the entry is a batch.
        *
        * @return true if the entry is a batch.
        */
       boolean isBatch();
   ```
   
   I've used `MessageContext` before, but it might be misunderstood as the context of a `Message<T>`.
   
   Your concern is right. `Entry` is a concept from BookKeeper. For Pulsar client users, it might look a little unfamiliar. I think we can rename it `PayloadContext`? Since it's a context of a payload.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930697502


   The proposal LGTM, thanks @eolivelli @BewareMyPower for the great context and discussion. I only have one point about the name `EntryContext`, because you know the entry is the concept from BookKeeper and currently we don't have any concepts about the entry at the client-side, so maybe BatchContext is more reasonable here? Greate proposal! @BewareMyPower


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930048387


   we should also make it clear to what happens to the we should move everything in this form
   
   `void  convert(EntryContext context, MessagePayload payload, Schema<T> schema, Consumer<Message<T> consumer);`
   this way the loop is performed inside the method body:
   * the lifecycle is clearer
   * no need for additional lifecycle methods
   * in the future it can be called by **multiple threads**
   * the `convert` method has full control over the loop and can handle errors/refcounts better


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930708942


   > so maybe BatchContext is more reasonable here? 
   
   I think not. Because the Entry might just be a single message, not the batch. You can also see related docs in `EntryContext`:
   
   ```java
       /**
        * Check whether the entry is a batch.
        *
        * @return true if the entry is a batch.
        */
       boolean isBatch();
   ```
   
   I've used `MessageContext` before, but it might be misunderstood as the context of a `Message<T>`.
   
   Your concern is right. `Entry` is a concept from BookKeeper. For Pulsar client users, it might look a little unfamiliar. ~~I think we can rename it `PayloadContext`? Since it's a context of a payload.~~ (Reject this thought because it will cause confusion when `getNumMessages()` doesn't return the actual number of messages because it's stored in payload)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-930733199


   I thought for a while, `PayloadContext` is more clear. My previous concerns can be solved by the document.
   
   - PayloadContext: the context of a message payload
   - MessagePayload: the message payload itself
   - PayloadProcessor: the processor for a message payload
   
   It looks more consistent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-929967313






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-929315736


   The current design of `MessagePayload` still looks a little weird, especially for the `copiedBuffer()` method. I might make some changes for `MessagePayload` tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-927884920


   `default void cleanup() {}`
   It is not clear to me when this method will be called, when we are closing the `Consumer` ?
   Because it has not reference to the `EntryContext`, so it is not related to `convert`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922405741


   @eolivelli `ConsumerBuilder` is a class in **pulsar-client-api** module that has no dependency. If **pulsar-common** was added as the dependency of **pulsar-client-api**, cyclic dependency would happen.
   
   ```
   org.apache.pulsar:pulsar-common:jar:2.9.0-SNAPSHOT
   +- org.apache.pulsar:pulsar-client-api:jar:2.9.0-SNAPSHOT:compile
   ```
   
   Also we can't put `PayloadConverter` into **pulsar-client-api** package because we must import `BrokerEntryMetadata` and `MessageMetadata` from **pulsar-common** package.
   
   If we changed the `PayloadConverter` interface to
   
   ```java
   interface PayloadConverter {
       ByteBuf convert(ByteBuf headerAndPayload);
   }
   ```
   
   Maybe it could work, but we need to parse `BrokerEntryMetadata` and `MessageMetadata` repeatedly in all valid implementations. BTW, we need to import netty dependency for `ByteBuf`. I'm not sure if it's proper for **pulsar-client-api** package to do that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922900220


   > `Payload convert(MessageContext message, Payload payload)`
   > 
   > with
   > 
   > ```
   > interface MessageContext {
   >              ... getMetadata(....)   
   >      }
   > ```
   > 
   > and a high level wrapper
   > 
   > ```
   >  interface Payload {
   >        int size()
   >        getInputStream()
   >        copyTo(....)
   >        static Payload wrap(byte[])
   >        static Payload wrap(ByteBuffer....)
   >        ...add whatever is needed...
   >      }
   > ```
   
   Good suggestion. I'll think about some proper interfaces for it.
   
   > second topic:
   > the client must be able to auto detect the "format" of the payload and then apply it automatically to the messages tagged with that format, the only constraint is that in the classpath you have the proper "MessageConverter"
   
   If I didn't understand wrong, I think it's a part of converter's implementation. In the previous PIP, I used two methods, one of them is for detecting the format. In this PIP, I merged them into one because the format detection is usually simple. See https://github.com/apache/pulsar/pull/12088/files#diff-f1e742e597b262e9da738239c878d0a0f3429f4484bc9461cb6a6f838c6a138a as an example.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922338129


   Why can't we pass an instance of the converter instead of using classpath lookup?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-922235898


   There're something in addition to the implementation.
   
   Before this proposal, I tried to add following method.
   
   ```java
       /**
        * Set the payload converter class.
        *
        * The class must implement org.apache.pulsar.common.api.raw.PayloadConverter in pulsar-common module. Otherwise,
        * this method will fail silently. If this method was called successfully, each time a message arrived, the raw
        * payload will be replaced by a new payload returned by PayloadConverter#convert(ByteBuf).
        *
        * @param clazz
        * @return
        */
       ConsumerBuilder<T> payloadConverterClass(Class<?> clazz);
   ```
   
   It's simple but looks ugly. After discussing with @codelipenghui, I adopted the current way to load converter at runtime. But there's something different. @codelipenghui's original opinion was that no changes should be made for client configs like `ConsumerBuilder`. However, there're some flaws for this solution. Because reflection brings overhead when creating a consumer. Though the overhead might not be significant, we should still avoid it for the regular cases.
   
   I've thought of another solution that all consumers share the same converter so that the class loader could only be called once. But it might become complicated if there're multiple entry formats. For example, assuming we've enabled KoP, AoP, MoP for the same broker and there're three different entry formats. With PIP 96, we can implement three converters like:
   
   - org.apache.pulsar.client.converter.kop.KafkaPayloadConverter
   - org.apache.pulsar.client.converter.aop.AmqpPayloadConverter
   - org.apache.pulsar.client.converter.mop.MqttPayloadConverter
   
   Then we can create three consumers using a common `PulsarClient`. It's also why I configured the converter package at consumer level instead of client level.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-929315736


   The current design of `MessagePayload` still looks a little weird, especially for the `copiedBuffer()` method. I might make some changes for `MessagePayload` tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #12087: PIP 96: Payload converter for Pulsar client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-925824235


   I really love this new proposal.
   But I don't understand how newSingleMessage works.
   it is for creating batched messages ?
   
   can you sketch an example in pseudo code ?
   
   apart from that I think we are on our way


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #12087: PIP 96: Message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #12087:
URL: https://github.com/apache/pulsar/issues/12087#issuecomment-931093188


   @BewareMyPower LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui closed issue #12087: PIP 96: Message payload processor for Pulsar client

Posted by GitBox <gi...@apache.org>.
codelipenghui closed issue #12087:
URL: https://github.com/apache/pulsar/issues/12087


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org