You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Kirill Rodionov (Jira)" <ji...@apache.org> on 2021/04/21 19:55:00 UTC

[jira] [Comment Edited] (KAFKA-5761) Serializer API should support ByteBuffer

    [ https://issues.apache.org/jira/browse/KAFKA-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326876#comment-17326876 ] 

Kirill Rodionov edited comment on KAFKA-5761 at 4/21/21, 7:54 PM:
------------------------------------------------------------------

If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating them in the serializer is not going to achieve much because the data is copied to accumulator (in KafkaProducer.doSend) after the serializer is invoked and there's no way to release the buffer back to the pool other than in finalize() method.

There appears to be more sense in explicit support for ByteBuffer as a value so that user code can allocate it before calling producer.send and release after that since it's guaranteed that the contents have been copied to producer's accumulator by then

The only huccup here is that "value.serializer" property is mandatory and its presence is checked at KafkaProducer's contstruction time when there's no value present yet and therefore, no way to know if it's going to be a ByteBuffer

If you make the value.serializer property optional, then there's a possibility of an error on the first send() invocation if the value happens to be anything other than ByteBuffer

 

The other option is to keep value serializer mandatory like now but ignore it if the value is a ByteBuffer whose contents can be copied without any serializers

valueBytes would have to be removed from Partitioner's signature (no impl I know uses that argument anyway)

 

WDYT?


was (Author: bruto):
If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating them in the serializer is not going to achieve much because the data is copied to accumulator (in KafkaProducer.doSend) after the serializer is invoked and there's no way to release the buffer back to the pool other than in finalize() method.

There appears to be more sense in explicit support for ByteBuffer as a value so that user code can allocate it before calling producer.send and release after that since it's guaranteed that the contents have been copied to producer's accumulator by then

The only huccup here is that "value.serializer" property is mandatory and its presence is checked at KafkaProducer's contstruction time when there's no value to query its type.

If you make the value.serializer property optional, then there's a possibility of an error at the first send() invocation if the value happens to be anything other than ByteBuffer

 

The other option is to keep value serializer mandatory like now but ignore it if the value is a ByteBuffer whose contents can be copied without any serializers

valueBytes would have to be removed from Partitioner's signature (no impl I know uses that argument anyway)

 

WDYT?

> Serializer API should support ByteBuffer
> ----------------------------------------
>
>                 Key: KAFKA-5761
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5761
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.11.0.0
>            Reporter: Bhaskar Gollapudi
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>              Labels: features, performance
>
> Consider the Serializer : Its main method is :
> byte[] serialize(String topic, T data);
> Producer applications create a implementation that takes in an instance (
> of T ) and convert that to a byte[]. This byte array is allocated a new for
> this message.This byte array then is handed over to Kafka Producer API
> internals that write the bytes to buffer/ network socket. When the next
> message arrives , the serializer instead of creating a new byte[] , should
> try to reuse the existing byte[] for the new message. This requires two
> things :
> 1. The process of handing off the bytes to the buffer/socket and reusing
> the byte[] must happen on the same thread.
> 2 There should be a way for marking the end of available bytes in the
> byte[].
> The first is reasonably simple to understand. If this does not happen , and
> without other necessary synchrinization , the byte[] get corrupted and so
> is the message written to buffer/socket.However , this requirement is easy
> to meet for a producer application , because it controls the threads on
> which the serializer is invoked.
> The second is where the problem lies with the current API. It does not
> allow a variable size of bytes to be read from a container. It is limited
> by the byte[]'s length. This forces the producer to
> 1 either create a new byte[] for a message that is bigger than the previous
> one.
> OR
> 2. Decide a max size and use a padding .
> Both are cumbersome and error prone, and may cause wasting of network
> bandwidth.
> Instead , if there is an Serializer with this method :
> ByteBuffer serialize(String topic, T data);
> This helps to implements a reusable bytes container for  clients to avoid
> allocations for each message.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)