You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/08 05:21:39 UTC

[GitHub] [pulsar] BewareMyPower opened a new issue #11962: PIP 94: Message converter at broker level

BewareMyPower opened a new issue #11962:
URL: https://github.com/apache/pulsar/issues/11962


   ## Motivation
   
   The initial motivation was from Kafka's protocol handler, i.e. KoP (https://github.com/streamnative/kop). KoP allows Kafka producer to configure entry format to `kafka`, which means the message from Kafka producer can be written to bookies directly without any conversion between Kafka's message format and Pulsar's message format. This improves the performance significantly. However, it also introduced the limit that Pulsar consumer cannot consume this topic because it cannot recognize the entry's format.
   
   This proposal tries to introduce a message converter, which is responsible to convert the buffer of an entry to the format that Pulsar consumer can recognize. Once the converter was configured, before dispatching messages to Pulsar consumers, the converter would check if the buffer needs to be converted and then perform the conversion if necessary. We can configure multiple converters because we can configure multiple protocol handlers as well. Each protocol handler could write the entry with its own format.
   
   The benefit is, after this change:
   
   - When other clients write an entry, no conversion is needed.
   - When other clients read an entry, no conversion is needed.
   - When a Pulsar consumer reads an entry, the conversion will be performed in broker.
   
   Before this change, if we want to interact Pulsar consumer with other clients:
   
   - When other clients write an entry, we need to convert it to Pulsar format.
   - When other clients read an entry, we need to convert it from Pulsar format to the specific format.
   - When a Pulsar consumer reads an entry, no conversion is needed.
   
   This proposal is mainly for protocol handlers because they can access `PersistentTopic` and write bytes to bookies directly. In a rare case, if users want to write something to the topic's ledger directly by BookKeeper client, the converter can also handle the case.
   
   ## Goal
   
   This proposal's goal is only adding message converter at broker level. Once the related broker configs were enabled, the converters would be applied to all topics. An overhead would be brought to the topics which are only created for Pulsar clients. Because we need to check if the buffer needs to be converted. See `MessageConverter#accept` method in the next section.
   
   In future, we can configure the message converters at namespace level or topic level. Even we can also configure the message converter for Pulsar client so that the conversion only happens at client side and the CPU overload of broker can be reduced.
   
   ## API changes
   
   First an interface is added under package `org.apache.pulsar.common.api.raw`
   
   ```java
   public interface MessageConverter {
   
       /**
        * Determine whether the buffer can be converted
        *
        * @param buffer the buffer that might need to be converted
        * @return whether the buffer can be converted
        */
       boolean accept(ByteBuf buffer);
   
       /**
        * Convert the buffer to the format that Pulsar consumer can recognize.
        *
        * @param originalBuffer the original buffer
        * @return the converted buffer
        */
       ByteBuf convert(ByteBuf originalBuffer);
   }
   ```
   
   The a new configuration is added
   
   ```java
       @FieldContext(
               category = CATEGORY_PLUGIN,
               doc = "List of message converters, which are responsible to convert entries before dispatching. If multiple"
                       + " converters are accepted for the same payload, the previous one in this list is preferred."
       )
       private List<String> messageConverters;
   ```
   
   ## Implementation
   
   The implementation is simple. When the broker starts, load all classes that implement `MessageConverter` interface from `messageConverters` config. Then we can pass the converters to `ServerCnx`. Each time a dispatcher dispatches messages to consumer, it will eventually call `ServerCnx#newMessageAndIntercept` method, in which we can perform the conversion.
   
   For unit tests, we can test following converters:
   
   1. `RejectAllConverter`: `accept`  returns false so that no conversion is performed.
   2. `EchoConverter`: `accept` returns true and `convert` simply returns the original buffer.
   3. `BytesConverter`: It's an example of a real world converter. The message format has the `MessageMetadata` part that has the `entry.format=bytes` property. And the payload part is only the raw bytes without `SingleMessageMetadata`. The `BytesConverter#converter`  will convert the raw bytes to the format that Pulsar consumer can recognize.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915711069


   > If the user only had Kafka clients for now, they can choose entry.format=kafka for high performance. If one day they switched to Pulsar clients, they can configure message converters to consume old data.
   If the user already had Kafka clients and Pulsar clients for the same topics, they must choose entry.format=pulsar now without PIP 94. In future, when they eliminate all Kafka clients, even including KoP itself, they'll find there's no way to consume old data.
   
   @BewareMyPower 
   Yes, I know this case. Use `entry.format=pulsar`, the message publishing side burden the broker. With PIP-94 consume from Pulsar client burden the broker, In my opinion, it is difficult for users to choose, if you have only one subscription to read the data from the topic by the Pulsar client, you might choose broker converter, but if you have 100 subscriptions to read the data from the topic by Pulsar client, you might want to choose the `entry.format=pulsar`.
   
   And If we are try to introduce the data format processor at the client-side again, users will have 3 options, `entry.format` for KoP, messageConverter for the broker, data format processor for the client-side. This will make it very complicated to use.
   
   In my opinion, 
   
   1. if users want to achieve high performance, they should avoid the broker-side data format conversion.
   2. If users do not care about the performance, they can use `entry.format=pulsar`, no need to add more dependencies at the client-side.
   
   The proposal looks like an `intermediate` approach for users who want to get high performance. I agree PIP-94 can bring performance improvements to certain scenarios, but considering the complexity and the broker resource consumption that PIP-94 might bring to Pulsar, we need to think about it carefully.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915730942


   @BewareMyPower Users should avoid using `entry.format=kafka` if they consider consuming data from the Pulsar clients in the future until we provide a way to consume the Kafka format data in the Pulsar client. And the pulsar format is the default option for KoP https://github.com/streamnative/kop/blob/330c3bf084104a548eb28f310860ac519c45d999/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java#L336. 
   
   In any case, if the user wants to get better performance, they need to do some changes, or broker-side, or client-side. Why not consider a more efficient way? In other words, if they change the format to kafka, users are looking forward to the high performance, but after we increase the burden of the broker, will this meet the user's performance requirements?
   
   If users set up incorrect format before, they should do the topic migration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915274820


   @codelipenghui You can see https://github.com/streamnative/kop/pull/673 for more details about the GC problem. In this PR, I used the direct memory allocator and the GC problem was fixed. The left heap memory usage is a KoP side problem, not related to data format conversion. Before this PR, deserializing Kafka records doesn't use heap memory.
   
   1. Kafka records to Pulsar message
   - Read records (no memory allocated)
   - Create Pulsar message (memory allocated from direct memory)
   2. Pulsar message to Kafka records
   - Read message (no memory allocated)
   - Create Kafka records (the default `MemoryRecords` allocates memory from heap memory, then GC happens, and this problem was solved by https://github.com/streamnative/kop/pull/673)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915716242


   @wangjialing218 
   
   > We could consider how to keep FIFO semantics when using multiple managed ledgers. For example, only one primary ledger is writeable (store original messages from producer), other ledgers are readable (store conversion result for each message from primary ledger), and keep the message order same in all ledgers. consumer could select primary ledger or one readable ledger to consume messages.
   
   @wangjialing218 This looks like need to copy data between different managed ledgers? and we have multiple copies with the primary managed ledger and the other managed ledgers.
   
   > I have also considerd convert message at broker level from other motivation (not only for protocol handler).
   Currently, if we want to do some customized message conversion before send messages to consumer, we could use pulsar funtion to consume messages from the original topic, do the conversion and publish result to another topic. This is a inefficiency way that would cost much more network and storage.
   
   I think it's not the same storage as the PIP-94 right? PIP 94 is converting the data encoding format, it will not touch the user's data, if you want to convert the user's data, the broker needs to deserialize data which will bring more GC workload on the broker. Yes do data conversion on the broker side can reduce the network workload, but increase the CPU workload, the burden of JVM GC, you might get a more unstable broker. 
   
   > The purpose of multiple managed ledgers for a topic is to do message conversion asynchronously. This will cost more storage but no need to sacrifice performance neither producer nor consumer side.
   
   But there are also many disadvantages, each managed ledger we need to maintain the metadata, more data copies, more entries write to bookies. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915241763


   @eolivelli Both KoP or the client-side converter are optional plugins, not required. If users tend to convert the data at the broker side, the existing KoP implementation already supports this requirement. They can just convert the data at the publish side, no need to convert it multiple times on the consumption side.
   
   I think we need to clarify the purpose of this proposal, we are trying to find a way to support efficient data conversion between different formats such as Kafka, Pulsar. This initial motivation is we are seen the performance issue on KoP before because we have done data conversion at the broker side, so introduce data format(Kafka or Pulsar) in KoP to avoid the data conversion. 
   
   This will not prevent users to use the broker-side data conversion, just to provide a more efficient way to handle the data conversion. Just one more choice for users, but if we did the data conversion at the broker-side, users can only choose one solution(Essentially, no difference between publishing converter and consumption converter, and the consumption converter is more expensive than the publishing side that @wangjialing218 has mentioned here https://github.com/apache/pulsar/issues/11962#issuecomment-914962880)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915269780


   @codelipenghui The GC problem for KoP is caused by the heap memory usage, not the conversion, the converter only affects CPU theoretically because all memory usages of messages are from direct memory.
   
   > I think we need to clarify the purpose of this proposal, we are trying to find a way to support efficient data conversion between different formats such as Kafka, Pulsar. 
   
   It's not. Introducing data format in KoP is to avoid the data conversion, right. Because we tends to push users to use pure Kafka client when using KoP. However, if they want to turn to Pulsar client later, it could be impossible because there's no way for Pulsar consumer to consume these messages produced by Kafka producer.
   
   Actually, it's right the conversion should be performed at client side. But upgrading the client might not be always easy. @eolivelli was right that ​this is not always possible because downstream users may not be aware of the format of the data.
   
   > If users tend to convert the data at the broker side, the existing KoP implementation already supports this requirement. 
   
   No. It requires configuring the `entry.format=pulsar` and then the interact among Kafka clients could have a bad performance. 
   
   In conclusion, supporting message converter at both broker level and client level is necessary. They have both PROs and CONs.
   - Broker support: All Pulsar clients are supported but the conversion overhead might affect the whole broker.
   - Client support: Only the latest Pulsar client (e.g. 2.9.0) is supported. Users must upgrade their client version. But the conversion overhead won't affect the whole broker.
   
   It's a priority comparison about **availability** vs. **performance**. I chose the **availability** so I tried to implement the Message converter at broker side first. The message converter here can also be applied to Pulsar Client. I'm not sure if it's proper to contain two tasks in a PIP. If yes, I can add the client side support in this proposal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wangjialing218 commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
wangjialing218 commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915702801


   > @wangjialing218
   > 
   > > Just a idea. Current there is one ManagedLedger(ledger) associated with PersistentTopic. Could we add another ManagedLedger(kafkaLedger) associated with the topic.
   > 
   > We can't use multiple managed ledgers for a topic, this will break the FIFO semantics if you have Kafka producers and Pulsar producers publishing data to the topic and Kafka consumers, Pulsar consumers to consume the data from the topic.
   > And to read data repeatedly, we need to ensure the same reading order.
   
   @codelipenghui We could consider how to keep FIFO semantics when using multiple managed ledgers. For example, only one primary ledger is writeable (store original messages from producer), other ledgers are readable (store conversion  result for each message from primary ledger), and keep the message order same in all ledgers. consumer could select primary ledger or one readable ledger to consume messages.
   
   I have also considerd convert message at broker level from other motivation  (not only for protocol handler). 
   Currently, if we want to do some customized message conversion before send messages to consumer, we could use pulsar funtion to consume messages from the original topic, do the conversion and publish result to another topic. This is a inefficiency way that would cost much more network and storage.
   And if the conversion is something like remove sensitive information from original message, which should not send to particular consumer, It's not suitable to do the conversion at cosumer side.
   
   The purpose of multiple managed ledgers for a topic is to do message conversion asynchronously. This will cost more storage but no need to sacrifice performance neither producer nor consumer side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui edited a comment on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui edited a comment on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915730942


   @BewareMyPower Users should avoid using `entry.format=kafka` if they consider consuming data from the Pulsar clients in the future until we provide a way to consume the Kafka format data in the Pulsar client. And the pulsar format is the default option for KoP https://github.com/streamnative/kop/blob/330c3bf084104a548eb28f310860ac519c45d999/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java#L336. 
   
   In any case, if the user wants to get better performance, they need to do some changes, or broker-side, or client-side. Why not consider a more efficient way? In other words, if they change the format to kafka, users are looking forward to the high performance, but after we increase the burden of the broker, will this meet the user's performance requirements?
   
   If users set up incorrect format before, they should do the topic migration before we provided the solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914947340


   Protocol handler can be used for the publish side.
   For example in KoP, if messages were produced from Pulsar producer, when
   KoP read entries from BK, it converts the message to Pulsar format so that
   Pulsar consumer can receive the message.
   See https://github.com/streamnative/kop/pull/632 <https://github.com/streamnative/kop/pull/632> for details.
   
   Thanks,
   Yunze Xu
   
   
   
   
   
   > 2021年9月8日 下午2:03,Enrico Olivelli ***@***.***> 写道:
   > 
   > 
   > do we need do add a Converter on the Publish side ?
   > IIUC this Converter is for the Consumer
   > 
   > —
   > You are receiving this because you were assigned.
   > Reply to this email directly, view it on GitHub <https://github.com/apache/pulsar/issues/11962#issuecomment-914943326>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AEK4RA3IL7F2TA32STSVZQ3UA34CJANCNFSM5DT4VXSA>.
   > 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wangjialing218 commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
wangjialing218 commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914962880


   If I use kafka client to produce messages and pulsar client to consume messages, the performace of message publish could be improve but performace of message dispatch may be downgrade.
   If the topic has multi subscriptions, one message could be dispatched to multi consumers, does the covert of this message happen multi times in this case?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915795151


   After discussing with @codelipenghui and @hangc0276 , I think it's better to add converter on client side. The main reason is we cannot find a good implementation to insert the converter.
   
   Currently, we must convert entries in callback of `asyncReadEntries`, like https://github.com/apache/pulsar/blob/3cd5b9ed3b59dd153b04ce4f6e61da377900f9be/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L446-L450
   
   After inserting the converter, it could be like
   
   ```java
       public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
           entries = converterList.convert(entries); // wrapper of converter's methods
           ReadType readType = (ReadType) ctx;
   ```
   
   `readEntriesComplete` runs **in a single thread**. Even if `convert` took 1 millisecond each time (actual for Kafka entry converter, it could take 5 or more milliseconds), the thread could be blocked and **all other topics could be affected**. It would be dangerous for broker to configure the converter.
   
   I'll open a new PIP to add message converter for Java client. For other client, maybe it needs extra efforts to do it.
   
   /cc @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914943326


   do we need do add a Converter on the Publish side ?
   IIUC this Converter is for the Consumer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915010526


   @wangjialing218 It could be an enhancement. I think we can submit another PIP for the design details. It sounds like a solution that each `PersistentTopic` has multiple `ManagedLedger`s and each `ManagedLedger` has its own message converter. But I think we can still use only one `ManagedLedger` because `MessageConverter#accept` can distinguish whether the buffer needs to be converted.
   
   For example, in KoP, there's still a `MessageMetadata` with an `entry.format=kafka` property added before the original Kafka records. The overhead is not significant IMO. We can easily differ a message from Kafka client or Pulsar client.
   
   @eolivelli BTW, I just found a problem from @wangjialing218's idea. If we passed `ServiceConfiguration` to `MessageConverter#init`, we should move it from `pulsar-common` module to `pulsar-broker-common` module. If we did that, `managed-ledger` module cannot use the converter. There's another problem that we must define a class that derived from `ServiceConfiguration` like what protocol handlers did. If we don't need the access to the existing configurations in `ServiceConfiguration`, the param will be unnecessary.
   
   Here is an example if the converter needs to initialize some resources.
   
   ```java
   KeyMessageConverter converter = new KeyMessageConverter(remoteServerAddress);
   converter.init(); // request key from remote server once or periodically
   // call accept() and convert() using the dynamically changed key...
   converter.close(); // close the connection to key server
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914987691


   > we should define some constraints on what the Converter can do with the input ByteBuf. Should it keep the reader/writer indexes ? what about refcount ?
   > can the Converter modify the ByteBuf in place or should it always create a copy ?
   
   Yes. I'll add more description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915211835


   @codelipenghui 
   
   >  if users want to
    consume the data with Kafka data format, they can add a separate dependency such as org.apache.pulsar:kafka-format-converter.
    
    this is not always possible because downstream users may not be aware of the format of the data.
   it should be something like Compression if you want to do it in the client, you must put a metadata in the message and let the client apply the converter depending on the format.
   
   btw this will be very hard in big enterprises, to require to add a client side plugin to every possible client 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915721441


   @codelipenghui 
   
   > In my opinion,
   >
   > 1. if users want to achieve high performance, they should avoid the broker-side data format conversion.
   > 2. If users do not care about the performance, they can use entry.format=pulsar, no need to add more dependencies at the client-side.
   
   It's right. But this proposal is mainly for **the first case**, not the second case. What if the user switched to Pulsar client later? Without the converter, they have to discard old messages. If it's not acceptable, they would never consider switching to Pulsar client.
   
   > but if you have 100 subscriptions to read the data from the topic by Pulsar client, you might want to choose the entry.format=pulsar.
   
   It's right because in this case users prefer to store messages as Pulsar format. However, as I've said, the message converter is mainly for the `entry.format=kafka` case, i.e. users prefer to store messages as Kafka format. But in some cases, they also need Pulsar consumer to read these messages.
   
   Both `kafka` and `pulsar` entry format would bring burdens on broker, but we must ensure that the interaction among Kafka clients and Pulsar clients work for both cases.
   
   > And If we are try to introduce the data format processor at the client-side again, users will have 3 options, entry.format for KoP, messageConverter for the broker, data format processor for the client-side. This will make it very complicated to use.
   
   1. `entry.format=pulsar` without any care for performance.
   2. `entry.format=kafka` for better performance among Kafka clients.
       If you still want to consume these messages somehow with Pulsar client, configure the message converter
       1. at client level if you can upgrade your Pulsar client >= 2.9.0
       2. at broker level if you have some older Pulsar clients and cannot upgrade for some reasons.
   
   We should cover all use cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914944145


   I would add some "init(ServerConfiguration configuration)" method and a close() method, in order to let it be configurable and to let it release resources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915830612


   > I'll open a new PIP to add message converter for Java client. For other client, maybe it needs extra efforts to do it.
   
   Make sense to me.
   You probably can push this PIP to the Wiki and tag it as "Rejected"
   this way it will stay as docs for the future
   
   cc @merlimat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915296005


   @BewareMyPower 
   
   > No. It requires configuring the entry.format=pulsar and then the interact among Kafka clients could have a bad performance.
   
   Because we are converting the Kafka format data to Pulsar format data right? If yes, what is the difference when we converting Kafka format data to Pulsar format data at the consumption path?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915738177


   @codelipenghui The default `entry.format` is `pulsar` is just because there's no way to interact among Pulsar clients and Kafka clients when `entry.format` is `kafka` before. https://github.com/streamnative/kop/pull/632 fixed a part of this problem so that Kafka consumer can consume messages from Pulsar producer. But if we want Pulsar consumer to consume messages from Kafka producer, we need to fix the broker side. If we have the PIP 94, things became easy. `entry.format=pulsar` would be deprecated. Regarding to the case there're multiple subscriptions on the same topic, it's just a **bad vs more bad** compare.
   
   > after we increase the burden of the broker, will this meet the user's performance requirements?
   
   It's still a priority comparison I've said before: **availability vs. performance**. We should tell user **not to use older Pulsar clients.** I accept both cases, just need to change PIP 94 to `Message converter at client level`, we still have the converter but it should only be added to client's config.
   
   > If users set up incorrect format before, they should do the topic migration before we provided the solution.
   
   It's nearly impossible. The `entry.format` config is broker level. For example, if you have N messages with `entry.format=pulsar`, you can read them all with Pulsar consumer and **persist them somewhere**. Then change the `entry.format` to `kafka`, restart the broker, and **retrieve messages from somewhere and publish them to new topics**. Changing `entry.format` from `kafka` to `pulsar` is the same.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914970028


   @wangjialing218 The conversion is unavoidable. We have to sacrifice either producer or consumer side. But before this proposal, if we're going to support both Kafka and Pulsar clients, the conversion was:
   
   1. Pulsar producer -> BK
   2. Kafka producer -> convert to Pulsar format (by KoP) -> BK
   3. BK -> convert to Kafka format (by KoP) -> Kafka consumer
   4. BK -> Pulsar consumer
   
   After this proposal:
   1. Pulsar producer -> BK
   2. Kafka producer -> BK
   3. BK -> convert if necessary (by KoP) -> Kafka consumer
   4. BK -> convert if necessary (by MessageConverter) -> Pulsar consumer
   
   For your second question
   
   > If the topic has multi subscriptions, one message could be dispatched to multi consumers, does the covert of this message happen multi times in this case?
   
   Currently yes. But I'm not sure if the implementation can be improved. In my demo implementation I performed the conversion in  `ServerCnx#newMessageAndIntercept`. But maybe it could be performed immediately after the entries were read.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915681440


   @codelipenghui The fact is the most existing KoP users prefer `entry.format=kafka`. If we changed `entry.format` from `kafka` to `pulsar`, all existing topics/data would be not available because the entries are all Kafka format.
   - Pulsar consumers cannot recognize these entries without PIP 94.
   - KoP itself cannot recognize these entries because KoP assumes them as Pulsar format, so Kafka consumers cannot receive them.
   
   Even without these concerns, you can see following table for the comparison.
   
   | Performance                       | entry.format=kafka | entry.format=pulsar |
   | --------------------------------- | ------------------ | ------------------- |
   | Kafka Producer & Kafka Consumer   | High               | Extremely low       |
   | Kafka Producer & Pulsar Consumer  | **Not Available**  | Low                 |
   | Pulsar Producer & Pulsar Consumer | High               | High                |
   | Pulsar Producer & Kafka Consumer  | Low                | Low                 |
   
   After PIP 94, the **Not Available** entry would become *Low*. The main difference is, for all Kafka producers, `entry.format=pulsar` will always perform the conversion even if the consumers are also Kafka consumers. `entry.format=kafka` performs conversions only if necessary for consumer side.
   
   | Entry's format | Kafka Consumer | Pulsar Consumer |
   | -------------- | -------------- | --------------- |
   | Pulsar         | Conversion     | No conversion   |
   | Kafka          | No conversion  | Conversion      |
   
   After PIP 94, the **No conversion** of first row would become **a check for entry buffer**.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914972162


   BTW, here is my demo implementation in https://github.com/BewareMyPower/pulsar/tree/bewaremypower/payload-converter (will be deleted after pushing the final implementation). The unit tests were not pushed but I've added them in my local env. And the interface was also a little different.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915758692


   > I accept both cases, just need to change PIP 94 to Message converter at client level, we still have the converter but it should only be added to client's config.
   
   I just thought of a problem. If we added converter on client side, only Java client (>= 2.9.0) was able to consume these messages.  There is still no way for other client to consume them. We need to write converters for other language clients, it could take much effort because we cannot reuse the Java classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914950524


   > I would add some "init(ServerConfiguration configuration)" method and a close() method, in order to let it be configurable and to let it release resources.
   
   Thanks for your suggestion. It makes sense to me. When we need to load converters from other systems, the load and release phase are necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914970028


   @wangjialing218 The conversion is unavoidable. We have to sacrifice either producer or consumer side. But before this proposal, if we're going to support both Kafka and Pulsar clients, the conversion was:
   
   1. Pulsar producer -> BK
   2. Kafka producer -> convert to Pulsar format -> BK
   3. BK -> convert to Kafka format -> Kafka consumer
   4. BK -> Pulsar consumer
   
   After this proposal:
   1. Pulsar producer -> BK
   2. Kafka producer -> BK
   3. BK -> convert if necessary (by KoP) -> Kafka consumer
   4. BK -> convert if necessary (by MessageConverter) -> Pulsar consumer
   
   For your second question
   
   > If the topic has multi subscriptions, one message could be dispatched to multi consumers, does the covert of this message happen multi times in this case?
   
   Currently yes. But I'm not sure if the implementation can be improved. In my demo implementation I performed the conversion in  `ServerCnx#newMessageAndIntercept`. But maybe it could be performed immediately after the entries were read.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915314070


   If using `entry.format=pulsar`, 
   
   for the Kafka client publish:  kafka format -> pulsar format, for Kafka client consume: pulsar format -> kafka format (improved by streamnative/kop#673)
   for the Pulsar client consume pulsar format -> pulsar format
   
   After introduced the message converter and `entry.format=kafka`:
   
   for the Kafka client publish:  kafka format -> kafka format, for Kafka client consume: kafka format -> kafka format
   for the Pulsar client consume: kafka format -> pulsar format (by the converter)
   
   Is my understanding above correct @BewareMyPower 
   
   I said `the existing KoP implementation already supports this requirement` means if we are using the `entry.format=pulsar`, the Kafka client can consume the data written by the Pulsar client and the Pulsar client can consume the data from the Kafka client by some conversion at the broker side. I did not understand why the broker side converter approach will have a great advantage over `entry.format=pulsar`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower edited a comment on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower edited a comment on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915274820


   @codelipenghui You can see https://github.com/streamnative/kop/pull/673 for more details about the GC problem. In this PR, I used the direct memory allocator and the GC problem was fixed. The left heap memory usage is a KoP side problem, not related to data format conversion. Before this PR, deserializing Kafka records also doesn't use heap memory.
   
   1. Kafka records to Pulsar message
   - Read records (no memory allocated)
   - Create Pulsar message (memory allocated from direct memory)
   2. Pulsar message to Kafka records
   - Read message (no memory allocated)
   - Create Kafka records (the default `MemoryRecords` allocates memory from heap memory, then GC happens, and this problem was solved by https://github.com/streamnative/kop/pull/673)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915683801


   If the user only had Kafka clients for now, they can choose `entry.format=kafka` for high performance. If one day they switched to Pulsar clients, they can configure message converters to consume old data.
   
   If the user already had Kafka clients and Pulsar clients for the same topics, they must choose `entry.format=pulsar` now without PIP 94. In future, when they eliminate all Kafka clients, even including KoP itself, they'll find there's no way to consume old data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wangjialing218 commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
wangjialing218 commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915728830


   @codelipenghui
   > I think it's not the same storage as the PIP-94 right? PIP 94 is converting the data encoding format, it will not touch the user's data, if you want to convert the user's data, the broker needs to deserialize data which will bring more GC workload on the broker. Yes do data conversion on the broker side can reduce the network workload, but increase the CPU workload, the burden of JVM GC, you might get a more unstable broker.
   
   Yes, I means convert user's data in broker side. Thanks for your advise, I'll consider the disadvantages and see if there is a solution.
   
   > 
   > But there are also many disadvantages, each managed ledger we need to maintain the metadata, more data copies, more entries write to bookies.
   
   Could we record the primary ledger id in other managed ledgers' metadata, write only one data copies to bookies? If the data is lost, we read data from primary ledger (which have multiple copies) with same entry id and do conversion again.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower closed issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower closed issue #11962:
URL: https://github.com/apache/pulsar/issues/11962


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915035889


   I've updated the `MessageConverter` API and add the validator to the implementation. PTAL again @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-915208660


   @BewareMyPower 
   > Once the converter was configured, before dispatching messages to Pulsar consumers, the converter would check if the buffer needs to be converted and then perform the conversion if necessary. 
   
   We should avoid converting(serialization and deserialization) the data at the broker side, this will put a very heavy burden on the broker GC. In my opinion, we should do the data converting at the client-side, we can have a diverse data format implementation and by default, the Pulsar client only has the Pulsar data format processor. if users want to consume the data with Kafka data format, they can add a separate dependency such as `org.apache.pulsar:kafka-format-converter`.
   
   The data to the Kafka client, should not be considered by the Pulsar broker, the KoP should handle it. If the data with Kafka format, KoP can send it directly to the Kafka consumer, if the data with Pulsar format, KoP needs to convert the data to the Kafka format.
   
   For the storage layer(BookKeeper and tiered storage), the data might be read directly bypass the broker such as PulsarSQL, Flink(In the future). This is also be considered if we are doing the data conversion at the broker side, we might need another implementation to read the data with multiple data formats from the BookKeeper/Tiered Storage. 
   
   @wangjialing218 
    > Just a idea. Current there is one ManagedLedger(ledger) associated with PersistentTopic. Could we add another ManagedLedger(kafkaLedger) associated with the topic.
   
   We can't use multiple managed ledgers for a topic, this will break the FIFO semantics if you have Kafka producers and Pulsar producers publishing data to the topic and Kafka consumers, Pulsar consumers to consume the data from the topic.
   And to read data repeatedly, we need to ensure the same reading order.
   
   @eolivelli 
   > do we need do add a Converter on the Publish side ?
   
   If the KoP wants to convert the data at the publishing path, KoP(Or other protocol handlers) can implement directly, any reason introduces the converter at the broker for the data publishing? And I think if using kafka format for KoP, KoP will convert the data on the publish side for now, this is an inefficient way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wangjialing218 commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
wangjialing218 commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914995316


   @BewareMyPower Just a idea. Current there is one ManagedLedger(ledger) associated with `PersistentTopic`. Could we add another ManagedLedger(kafkaLedger) associated with the topic. 
   1. Pulsar producer -> ledger
   2. Kafka producer -> kafkaLedger
   3. ledger -> Pulsar consumer
   4. kafkaLedger-> Kafka consumer
   5. ledger -> convert from pulsar format to kafka  -> kopLedger
   6. kopLedger -> convert from kafka format to pulsar -> ledger
   5 and 6 are running in background task, so the conversion is done only once asynchronously. It will cost more disk space but improve performace of message throughput.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on issue #11962: PIP 94: Message converter at broker level

Posted by GitBox <gi...@apache.org>.
eolivelli commented on issue #11962:
URL: https://github.com/apache/pulsar/issues/11962#issuecomment-914943889


   we should define some constraints on what the Converter can do with the input ByteBuf. Should it keep the reader/writer indexes ? what about refcount ?
   can the Converter modify the ByteBuf in place or should it always create a copy ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org