You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2015/01/16 20:58:54 UTC

Re: Kafka sending messages with zero copy

Hi Rajiv,

Thanks for this proposal, it would be great if you can upload some
implementation patch for the CAS idea and show some memory usage / perf
differences.

Guozhang

On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Resuscitating this thread. I've done some more experiments and profiling.
> My messages are very tiny (currently 25 bytes) per message and creating
> multiple objects per message leads to a lot of churn. The memory churn
> through creation of convenience objects is more than the memory being used
> by my objects right now. I could probably batch my messages further, to
> make this effect less pronounced.​ I did some rather unscientific
> experiments with a flyweight approach on top of the ByteBuffer for a simple
> messaging API (peer to peer NIO based so not a real comparison) and the
> numbers were very satisfactory and there is no garbage created in steady
> state at all. Though I don't expect such good numbers from actually going
> through the broker + all the other extra stuff that a real producer would
> do, I think there is great potential here.
>
> The general mechanism for me is this:
> i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
> performance) is created per partition.
> ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt() in
> Java 8 can be used to claim a chunk of bytes on the per topic buffer. This
> code can be invoked from multiple threads in a wait free manner (wait-free
> in Java 8, since getAndAddInt() is wait-free).  Once a region in the buffer
> is claimed, it can be operated on using the flyweight method that we talked
> about. If the buffer doesn't have enough space then we can drop the message
> or move onto a new buffer. Further this creates absolutely zero objects in
> steady state (only a few objects created in the beginning). Even if the
> flyweight method is not desired, the API can just take byte arrays or
> objects that need to be serialized and copy them onto the per topic buffers
> in a similar way. This API has been validated in Aeron too, so I am pretty
> confident that it will work well. For the zero copy technique here is a
> link to Aeron API with zero copy -
> https://github.com/real-logic/Aeron/issues/18. The regular one copies byte
> arrays but without any object creation.
> iii) The producer send thread can then just go in FIFO order through the
> buffer sending messages that have been committed using NIO to rotate
> between brokers. We might need a background thread to zero out used buffers
> too.
>
> I've left out some details, but again none of this very revolutionary -
> it's mostly the same techniques used in Aeron. I really think that we can
> keep the API ga rbage free and wait-free (even in the multi producer case)
> without compromising how pretty it looks - the total zero copy API will low
> level, but it should only be used by advanced users. Moreover the usual
> producer.send(msg, topic, partition) can use the efficient ByteBuffer
> offset API internally without it itself creating any garbage. With the
> technique I talked about there is no need for an intermediate queue of any
> kind since the underlying ByteBuffer per partition acts as the queue.
>
> I can do more experiments with some real producer code instead of my toy
> code to further validate the idea, but I am pretty sure that both
> throughput and jitter characteristics will improve thanks to lower
> contention (wait-free in java 8 with a single getAndAddInt() operation for
> sync ) and better cache locality (C like buffers and a few constant number
> of objects per partition). If you guys are interested, I'd love to talk
> more. Again just to reiterate, I don't think the API will suffer at all -
> most of this can be done under the covers. Additionally it will open up
> things so that a low level zero copy API is possible.
>
> Thanks,
> Rajiv
>



-- 
-- Guozhang

Re: Kafka sending messages with zero copy

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Rajiv, looking forward to your prototype.

Guozhang

On Mon, Jan 26, 2015 at 2:30 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hi Guozhang,
>
> I am a bit busy at work. When I get the change I'll definitely try to get a
> proof of concept going. Not the kafka protocol, but just the buffering and
> threading structures, maybe just write to another socket. I think it would
> be useful just to get the queueing and buffer management going and prove
> that it can be done in a zero copy way in a multi producer single consumer
> environment. If that is working, then the single consumer can be the kafka
> network sync thread.
>
> Thanks,
> Rajiv
>
> On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Hi Rajiv,
> >
> > Thanks for this proposal, it would be great if you can upload some
> > implementation patch for the CAS idea and show some memory usage / perf
> > differences.
> >
> > Guozhang
> >
> > On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Resuscitating this thread. I've done some more experiments and
> profiling.
> > > My messages are very tiny (currently 25 bytes) per message and creating
> > > multiple objects per message leads to a lot of churn. The memory churn
> > > through creation of convenience objects is more than the memory being
> > used
> > > by my objects right now. I could probably batch my messages further, to
> > > make this effect less pronounced.​ I did some rather unscientific
> > > experiments with a flyweight approach on top of the ByteBuffer for a
> > simple
> > > messaging API (peer to peer NIO based so not a real comparison) and the
> > > numbers were very satisfactory and there is no garbage created in
> steady
> > > state at all. Though I don't expect such good numbers from actually
> going
> > > through the broker + all the other extra stuff that a real producer
> would
> > > do, I think there is great potential here.
> > >
> > > The general mechanism for me is this:
> > > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
> > > performance) is created per partition.
> > > ii) A CAS loop (in Java 7 and less) or even better
> unsafe.getAndAddInt()
> > in
> > > Java 8 can be used to claim a chunk of bytes on the per topic buffer.
> > This
> > > code can be invoked from multiple threads in a wait free manner
> > (wait-free
> > > in Java 8, since getAndAddInt() is wait-free).  Once a region in the
> > buffer
> > > is claimed, it can be operated on using the flyweight method that we
> > talked
> > > about. If the buffer doesn't have enough space then we can drop the
> > message
> > > or move onto a new buffer. Further this creates absolutely zero objects
> > in
> > > steady state (only a few objects created in the beginning). Even if the
> > > flyweight method is not desired, the API can just take byte arrays or
> > > objects that need to be serialized and copy them onto the per topic
> > buffers
> > > in a similar way. This API has been validated in Aeron too, so I am
> > pretty
> > > confident that it will work well. For the zero copy technique here is a
> > > link to Aeron API with zero copy -
> > > https://github.com/real-logic/Aeron/issues/18. The regular one copies
> > byte
> > > arrays but without any object creation.
> > > iii) The producer send thread can then just go in FIFO order through
> the
> > > buffer sending messages that have been committed using NIO to rotate
> > > between brokers. We might need a background thread to zero out used
> > buffers
> > > too.
> > >
> > > I've left out some details, but again none of this very revolutionary -
> > > it's mostly the same techniques used in Aeron. I really think that we
> can
> > > keep the API ga rbage free and wait-free (even in the multi producer
> > case)
> > > without compromising how pretty it looks - the total zero copy API will
> > low
> > > level, but it should only be used by advanced users. Moreover the usual
> > > producer.send(msg, topic, partition) can use the efficient ByteBuffer
> > > offset API internally without it itself creating any garbage. With the
> > > technique I talked about there is no need for an intermediate queue of
> > any
> > > kind since the underlying ByteBuffer per partition acts as the queue.
> > >
> > > I can do more experiments with some real producer code instead of my
> toy
> > > code to further validate the idea, but I am pretty sure that both
> > > throughput and jitter characteristics will improve thanks to lower
> > > contention (wait-free in java 8 with a single getAndAddInt() operation
> > for
> > > sync ) and better cache locality (C like buffers and a few constant
> > number
> > > of objects per partition). If you guys are interested, I'd love to talk
> > > more. Again just to reiterate, I don't think the API will suffer at
> all -
> > > most of this can be done under the covers. Additionally it will open up
> > > things so that a low level zero copy API is possible.
> > >
> > > Thanks,
> > > Rajiv
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka sending messages with zero copy

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Hi Guozhang,

I am a bit busy at work. When I get the change I'll definitely try to get a
proof of concept going. Not the kafka protocol, but just the buffering and
threading structures, maybe just write to another socket. I think it would
be useful just to get the queueing and buffer management going and prove
that it can be done in a zero copy way in a multi producer single consumer
environment. If that is working, then the single consumer can be the kafka
network sync thread.

Thanks,
Rajiv

On Fri, Jan 16, 2015 at 11:58 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Rajiv,
>
> Thanks for this proposal, it would be great if you can upload some
> implementation patch for the CAS idea and show some memory usage / perf
> differences.
>
> Guozhang
>
> On Sun, Dec 14, 2014 at 9:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Resuscitating this thread. I've done some more experiments and profiling.
> > My messages are very tiny (currently 25 bytes) per message and creating
> > multiple objects per message leads to a lot of churn. The memory churn
> > through creation of convenience objects is more than the memory being
> used
> > by my objects right now. I could probably batch my messages further, to
> > make this effect less pronounced.​ I did some rather unscientific
> > experiments with a flyweight approach on top of the ByteBuffer for a
> simple
> > messaging API (peer to peer NIO based so not a real comparison) and the
> > numbers were very satisfactory and there is no garbage created in steady
> > state at all. Though I don't expect such good numbers from actually going
> > through the broker + all the other extra stuff that a real producer would
> > do, I think there is great potential here.
> >
> > The general mechanism for me is this:
> > i) A buffer (I used Unsafe but I imagine ByteBuffer having similar
> > performance) is created per partition.
> > ii) A CAS loop (in Java 7 and less) or even better unsafe.getAndAddInt()
> in
> > Java 8 can be used to claim a chunk of bytes on the per topic buffer.
> This
> > code can be invoked from multiple threads in a wait free manner
> (wait-free
> > in Java 8, since getAndAddInt() is wait-free).  Once a region in the
> buffer
> > is claimed, it can be operated on using the flyweight method that we
> talked
> > about. If the buffer doesn't have enough space then we can drop the
> message
> > or move onto a new buffer. Further this creates absolutely zero objects
> in
> > steady state (only a few objects created in the beginning). Even if the
> > flyweight method is not desired, the API can just take byte arrays or
> > objects that need to be serialized and copy them onto the per topic
> buffers
> > in a similar way. This API has been validated in Aeron too, so I am
> pretty
> > confident that it will work well. For the zero copy technique here is a
> > link to Aeron API with zero copy -
> > https://github.com/real-logic/Aeron/issues/18. The regular one copies
> byte
> > arrays but without any object creation.
> > iii) The producer send thread can then just go in FIFO order through the
> > buffer sending messages that have been committed using NIO to rotate
> > between brokers. We might need a background thread to zero out used
> buffers
> > too.
> >
> > I've left out some details, but again none of this very revolutionary -
> > it's mostly the same techniques used in Aeron. I really think that we can
> > keep the API ga rbage free and wait-free (even in the multi producer
> case)
> > without compromising how pretty it looks - the total zero copy API will
> low
> > level, but it should only be used by advanced users. Moreover the usual
> > producer.send(msg, topic, partition) can use the efficient ByteBuffer
> > offset API internally without it itself creating any garbage. With the
> > technique I talked about there is no need for an intermediate queue of
> any
> > kind since the underlying ByteBuffer per partition acts as the queue.
> >
> > I can do more experiments with some real producer code instead of my toy
> > code to further validate the idea, but I am pretty sure that both
> > throughput and jitter characteristics will improve thanks to lower
> > contention (wait-free in java 8 with a single getAndAddInt() operation
> for
> > sync ) and better cache locality (C like buffers and a few constant
> number
> > of objects per partition). If you guys are interested, I'd love to talk
> > more. Again just to reiterate, I don't think the API will suffer at all -
> > most of this can be done under the covers. Additionally it will open up
> > things so that a low level zero copy API is possible.
> >
> > Thanks,
> > Rajiv
> >
>
>
>
> --
> -- Guozhang
>