You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rajiv Kurian <ra...@signalfuse.com> on 2015/03/20 03:18:15 UTC

Kafka 0.9 consumer API

Is there a link to the proposed new consumer non-blocking API?

Thanks,
Rajiv

Re: Kafka 0.9 consumer API

Posted by Jay Kreps <ja...@gmail.com>.
:-)

On Thursday, March 19, 2015, James Cheng <jc...@tivo.com> wrote:

> Those are pretty much the best javadocs I've ever seen. :)
>
> Nice job, Kafka team.
>
> -James
>
> > On Mar 19, 2015, at 9:40 PM, Jay Kreps <jay.kreps@gmail.com
> <javascript:;>> wrote:
> >
> > Err, here:
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> >
> > -Jay
> >
> > On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <jay.kreps@gmail.com
> <javascript:;>> wrote:
> >
> >> The current work in progress is documented here:
> >>
> >>
> >> On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <rajiv@signalfuse.com
> <javascript:;>>
> >> wrote:
> >>
> >>> Is there a link to the proposed new consumer non-blocking API?
> >>>
> >>> Thanks,
> >>> Rajiv
> >>>
> >>
> >>
>
>

Re: Kafka 0.9 consumer API

Posted by James Cheng <jc...@tivo.com>.
Those are pretty much the best javadocs I've ever seen. :)

Nice job, Kafka team.

-James

> On Mar 19, 2015, at 9:40 PM, Jay Kreps <ja...@gmail.com> wrote:
> 
> Err, here:
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> 
> -Jay
> 
> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com> wrote:
> 
>> The current work in progress is documented here:
>> 
>> 
>> On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <ra...@signalfuse.com>
>> wrote:
>> 
>>> Is there a link to the proposed new consumer non-blocking API?
>>> 
>>> Thanks,
>>> Rajiv
>>> 
>> 
>> 


Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Thanks Guozhang,

I am currently working on a project at my current company where I process
data from Kafka. The data is all tiny Kafka messages (25 -35 bytes) and so
far we were bottle necked on our processing speed. Recently we have made
significant improvements and our processing speed has gone to millions of
data points per second per thread. So once this is deployed our bottleneck
will shift to how fast we can ingest data on a single node. Once we've
deployed this new and improved application, I think the higher-ups will
want me to work on the Kafka-ingest efficiency/throughput. I"ll try to get
a prototype going when that time comes. I have bookmarked the bugs and this
email and will add more points.

Thanks,
Rajiv

On Thu, Mar 26, 2015 at 3:06 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Rajiv,
>
> Those are good points. As for implementation we have developed a class in
> producer that can be probably re-used for the consumer as well.
>
> org.apache.kafka.clients.producer.internals.BufferPool
>
> Please feel free to add more comments on KAFKA-2045.
>
> Guozhang
>
>
> On Tue, Mar 24, 2015 at 12:21 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Yeah the main motivation is to not require de-serialization but still
> allow
> > the consumer to de-serialize into objects if they really want to. Another
> > motivation for iterating over the ByteBuffer on the fly is that we can
> > prevent copies all together. This has an added implication though. Since
> we
> > re-use the same response buffer continuously and iterate over it, this
> > means that the iteration has to be done in a single threaded manner
> > specifically on the thread that did the poll. The work flow will look a
> bit
> > like this:
> >
> > i) Re-use the same request buffer to create a request and write to the
> > socket.
> > ii) On poll re-use the same response buffer to read in the request till
> it
> > is complete.
> > iii) When the response is complete respond with an iterator to the
> response
> > ByteBuffer. The consumer must now consume the entire ByteBuffer on this
> > thread since we use the a single mutable iterator to go through the
> > ByteBuffer.
> >
> > It is tricker when we consider that during iteration the consumer might
> > send more kafka requests and call poll further. We can maintain a pointer
> > to the end of the response ByteBuffer to allow more responses to stream
> in
> > and essentially use the response buffer as a circular buffer. So
> basically
> > there would be two pointers - iteration_pointer and
> >  network_append_pointer. And the iteration_pointer <=
> > network_append_pointer.  If the network_append_pointer reaches the end of
> > the buffer, then we cannot accept any more data without the
> > iteration_pointer having progressed further. Since responses have to be
> > continuous, we cannot have a response straddle the circular buffer's end.
> > In this case we'd have to detect this case and copy the response to the
> > beginning of the buffer so that it is continuous. Though this is a bit
> more
> > complicated, it ensures that we don't have to move in lockstep and can
> > pipeline requests/responses. I have written something like this for
> another
> > application and since this is all happening in a single thread it is a
> bit
> > easier and I think possible.
> >
> > Like Jay suggested if application level deserialization is a bottleneck
> > that needs to be solved by passing slices of these ByteBuffers out to a
> > pool of threads, then this approach WILL NOT work since we expect the
> > ByteBuffer to be linearly iterated in one go. If we want slices to be
> > passed to a pool of threads then probably copying to new ByteBuffers is
> the
> > only good option. For my application that is definitely not the case
> since
> > deserialization is free and the cache friendliness of iterating over a
> hot
> > buffer trumps every other factor. But for applications with more involved
> > serialization we can re-use the above framework. The more low level
> > iterator (discussed in paragraph 2 and 3) can be wrapped in a higher
> level
> > iterator that just copies the bytes for each message to a new ByteBuffer
> > and hands them over. The low level iterator is still responsible for
> buffer
> > management and the higher level iterator is just a plain old consumer
> that
> > consumes the bytes by copying them over to a new ByteBuffer and handing
> > them to the application. The application is now free to transfer these to
> > other threads for processing/deserialization.
> >
> > Thanks for creating KAFKA-1326. Let me know what your thoughts are on
> this
> > proposed design?
> >
> > Thanks,
> > Rajiv
> >
> >
> > On Tue, Mar 24, 2015 at 9:22 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hi Rajiv,
> > >
> > > Just want to clarify, that the main motivation for iterating over the
> > byte
> > > buffer directly instead of iterating over the records is for not
> > enforcing
> > > de-serialization, right? I think that can be done by passing the
> > > deserializer class info into the consumer record instead of putting the
> > > de-serialized objects into the consumer, and only do de-serialization
> > > on-the-fly when key/value fields are requested while exposing another
> API
> > > to return the underlying raw bytes from the record. What do you think?
> > >
> > > As for de-compression, today we allocate new buffer for each
> > de-compressed
> > > message, and it may be required to do de-compression with re-useable
> > buffer
> > > with memory control. I will create a ticket under KAFKA-1326 for that.
> > >
> > > Guozhang
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks for the note. So if we do not deserialize till the last moment
> > > like
> > > > Jay suggested we would not need extra buffers for deserialization.
> > Unless
> > > > we need random access to messages it seems like we can deserialize
> > right
> > > at
> > > > the time of iteration and allocate objects only if the Consumer
> > actually
> > > > requires deserialized objects. Some applications like mine can just
> > > utilize
> > > > the ByteBuffer contents and in those cases we can take in a
> ByteBuffer
> > > or a
> > > > ConsumerRecord which points to a ByteBuffer and make it point to the
> > > right
> > > > slice of our response buffer. This ByteBuffer can be re-used over and
> > > over
> > > > again.
> > > >
> > > > As for decompression - that will require an extra buffer allocated at
> > > > startup, but not new allocations per response. I imagine the steps
> > would
> > > be
> > > > something like this:
> > > >
> > > > i) Receive entire response which includes possibly compressed
> messages
> > on
> > > > ResponseBuffer. ResponseBuffer is only allocated once.
> > > > ii) Once we have received an entire response, return an iterator to
> the
> > > > underlying ByteBuffer.
> > > > iii) On each message set check if it is compressed or not.
> > > > iv) If compressed decompress only that message set in a streaming
> > manner
> > > > and stores the result in another preallocated buffer
> > > > DecompressedMessagesBuffer. When the consumer asks for the next
> message
> > > > alter the flyweight that it supplies you to point to the correct
> slice
> > in
> > > > the DecompressedMessagesBuffer. Do this till all decompressed
> messages
> > > from
> > > > that message set are consumed.
> > > > v) If the message set was not compressed, then alter the flyweight to
> > > point
> > > > to the correct slice in the ResponseBuffer. Do this till all messages
> > > from
> > > > this message set are consumed.
> > > >
> > > > So basically we will switch between the two buffers when messages are
> > > > compressed or otherwise stay only on the ResponseBuffer. With the
> lazy
> > > > approach another nice thing is we can do CRC validation right before
> > the
> > > > message gets consumed too. Which means that the CRC algorithm will
> scan
> > > > some bytes which will be hot in cache (since we are iterating a
> linear
> > > > array of bytes) and as soon as we have checked the bytes they will be
> > > > consumed by the application which means they will stay hot in L1
> cache.
> > > >
> > > > All of this "theory crafting" is based on my currently incomplete
> > > > understanding of the kafka protocol but it seems like compression is
> > per
> > > > message set so we can stream though. Also since in general we can
> > iterate
> > > > through the response buffer, we can do CRC validation right before
> the
> > > > message is consumed.
> > > >
> > > > Thanks,
> > > > Rajiv
> > > >
> > > > On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Rajiv,
> > > > >
> > > > > A side note for re-using ByteBuffer: in the new consumer we do plan
> > to
> > > > add
> > > > > some memory management module such that it will try to reuse
> > allocated
> > > > > buffer for fetch responses. But as Jay mentioned, for now inside
> the
> > > > poll()
> > > > > call de-serialization and de-compression is done which requires to
> > > > allocate
> > > > > another buffer to write the de-serialized and de-compressed bytes,
> > > hence
> > > > > even with fetch response buffer management today we still need to
> > > > allocate
> > > > > new buffers if compressed messages are delivered.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Zijing, the new consumer will be in the next release. We don't
> > have a
> > > > > hard
> > > > > > date for this yet.
> > > > > >
> > > > > > Rajiv, I'm game if we can show a >= 20% performance improvement.
> It
> > > > > > certainly could be an improvement, but it might also be that the
> > CRC
> > > > > > validation and compression dominate.
> > > > > >
> > > > > > The first step would be
> > > > > >   https://issues.apache.org/jira/browse/KAFKA-1895
> > > > > >
> > > > > > This would delay the deserialization of ConsumerRecords and make
> it
> > > > > > basically a wrapper around the actual MemoryRecords chunks which
> > are
> > > > > > basically ByteBuffer instances. We could then add an optional
> > > > > > ConsumerRecords param in poll, to allow you to hand back your
> > > > > > ConsumerRecords instance. Optimizing the ConsumerRecord instance
> > > reuse
> > > > > > would also be possible.
> > > > > >
> > > > > > That JIRA is actually useful irrespective of these optimizations
> > > though
> > > > > > because deferring the deserialization and decompression would
> allow
> > > you
> > > > > to
> > > > > > punt that work into a pool of processor threads. Since it is more
> > > > common
> > > > > to
> > > > > > see the real bottleneck be application serialization this could
> be
> > > > > > valuable.
> > > > > >
> > > > > > WRT the object reuse I wouldn't be shocked to learn that you
> > actually
> > > > get
> > > > > > equivalent stuff out of the jvm allocator's own pooling and/or
> > escape
> > > > > > analysis once we are doing the allocation on demand. So it would
> be
> > > > good
> > > > > to
> > > > > > show a real performance improvement on the newer JVMs before
> > deciding
> > > > to
> > > > > go
> > > > > > this route.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <
> > rajiv@signalfuse.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Just a follow up - I have implemented a pretty hacky prototype
> > It's
> > > > too
> > > > > > > unclean to share right now but I can clean it up if you are
> > > > > interested. I
> > > > > > > don't think it offers anything that people already don't know
> > about
> > > > > > though.
> > > > > > >
> > > > > > > My prototype doesn't do any metadata requests yet but I have a
> > > > > flyweight
> > > > > > > builder/parser of the FetchRequest and the FetchResponse
> protocol
> > > > that
> > > > > I
> > > > > > > based on the protocol page at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > > > > .
> > > > > > > On my consumer thread I allocate two buffers per broker a small
> > > > request
> > > > > > > buffer and a bigger response buffer. My assumption is that the
> > > > response
> > > > > > > buffer will be big enough that it is never too small for the
> > > > response.
> > > > > > If
> > > > > > > this assumption doesn't hold then we would have to reallocate a
> > > > bigger
> > > > > > > buffer. These are the steps I am following right now:
> > > > > > >
> > > > > > > i) Whenever a client makes a request I fill up the relevant
> > request
> > > > > > > buffer(s) using my flyweight object instead of allocating an
> > object
> > > > and
> > > > > > > then serializing it. I then write this buffer to the
> > SocketChannel
> > > > > > > connected to the right Kafka broker. I don't yet handle the
> case
> > > > where
> > > > > > all
> > > > > > > of the request bytes could not be  written synchronously. If I
> > were
> > > > to
> > > > > > > handle that properly I would need to register WRITE interest on
> > the
> > > > > > > SocketChannel on incomplete writes.
> > > > > > >
> > > > > > > ii) Whenever the client calls poll() I register read interest
> on
> > > the
> > > > > > socket
> > > > > > > channel using the selector and make a select call on the
> > underlying
> > > > > > > selector. If the selector says that the socket channel is
> > > readable, I
> > > > > > read
> > > > > > > all the bytes possible into my response ByteBuffer. If I can
> read
> > > the
> > > > > > first
> > > > > > > 4 bytes I know how big the response is. So I wait till all
> > response
> > > > > bytes
> > > > > > > have been read before attempting to parse it. This might take
> > > > multiple
> > > > > > poll
> > > > > > > calls. Once I have received the expected number of bytes I
> > iterate
> > > > > > through
> > > > > > > the response ByteBuffer. There is no random access access
> > provided
> > > to
> > > > > the
> > > > > > > ByteBuffer since there are way too many variable length fields.
> > > This
> > > > a
> > > > > > > DISADVANTAGE of doing flyweight style parsing instead of
> > > > > deserialization
> > > > > > > into POJOs with indexed data structures like maps.  I could
> build
> > > > > > indexing
> > > > > > > with a non-allocating primitive hashmap like the koloboke one
> > > mapping
> > > > > > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> > > > > required.
> > > > > > > For me the lack of random access has not been a problem at all.
> > > > > Iteration
> > > > > > > is done just by providing a ByteBuffer and offset (within
> > > ByteBuffer)
> > > > > > pair
> > > > > > > for each message. In my own application I wrap this ByteBuffer
> > and
> > > > > offset
> > > > > > > pair in my own flyweight which knows how to decode the data.
> > > > > > >
> > > > > > > iii) Once an entire response has been iterated through I can
> > re-use
> > > > > both
> > > > > > > the request as well as response buffers.
> > > > > > >
> > > > > > > I am sure this can be improved upon a lot.  I allocate the
> > > following
> > > > > > before
> > > > > > > starting my client:
> > > > > > >     i) A Direct ByteBuffer for requests.
> > > > > > >    ii) A Direct ByteBuffer for responses. Sizes of the
> > ByteBuffers
> > > > are
> > > > > > > chosen so that they can fit the biggest request/responses we
> > > expect.
> > > > > > >    ii) A flyweight that wraps the Request ByteBuffer and can be
> > > used
> > > > to
> > > > > > > write a particular kind of request.  So far I have only
> written a
> > > > > > flyweight
> > > > > > > for a FetchRequest.
> > > > > > >   iii) A flyweight that wraps the Response ByteBuffer and can
> be
> > > used
> > > > > to
> > > > > > > iterate through the entire response including finding errors.
> > There
> > > > is
> > > > > no
> > > > > > > random access allowed right now. So far I have only written a
> > > > flyweight
> > > > > > > parser for a FetchResponse.
> > > > > > >   iv) A flyweight for every type of application level message
> > that
> > > I
> > > > > > > expect. Iterating through the response ByteBuffer using the
> > > flyweight
> > > > > in
> > > > > > > (iii) I get offsets into the ByteBuffer for each individual
> > > message.
> > > > My
> > > > > > own
> > > > > > > messages work by using absolute position getLong(position),
> > > > > > > getShort(position) etc calls on a ByteBuffer, so this works out
> > > great
> > > > > for
> > > > > > > me. We could alternatively provide an API that does allocates a
> > new
> > > > > > > ByteBuffer and copies the data for people who don't want the
> zero
> > > > > > > allocation access.
> > > > > > >
> > > > > > > Sadly in my profiling I noticed that selector implementation in
> > the
> > > > > > > JDK allocates
> > > > > > > but it seems like projects like Netty, Aeron have worked around
> > > this
> > > > by
> > > > > > > using reflection to replace the underlying implementation to
> make
> > > it
> > > > > > > non-allocating. Other than that I have absolutely zero
> > allocations
> > > > past
> > > > > > the
> > > > > > > initial 4-5 allocations. I also have absolutely zero copies in
> > user
> > > > > space
> > > > > > > once the data lands from the socket onto the ByteBuffer.
> > > > > > >
> > > > > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <
> > > rajiv@signalfuse.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I had a few more thoughts on the new API. Currently we use
> > kafka
> > > to
> > > > > > > > transfer really compact messages - around 25-35 bytes each.
> Our
> > > use
> > > > > > case
> > > > > > > is
> > > > > > > > a lot of messages but each very small. Will it be possible to
> > do
> > > > the
> > > > > > > > following
> > > > > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We
> > > > employ
> > > > > > our
> > > > > > > > own binary encoding to encode our messages and it is encoded
> > as a
> > > > > > > flyweight
> > > > > > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > > > > > decoding/deserialization step. Currently we use the
> > > SimpleConsumer
> > > > > and
> > > > > > > have
> > > > > > > > found the fact that it hands out ByteBuffers very useful
> > instead
> > > > of a
> > > > > > > > mandatory deserialization into a POJO step. Even then through
> > > > memory
> > > > > > > > profiling we have found out that the ByteBuffers and the
> > records
> > > > take
> > > > > > > more
> > > > > > > > space than the actual messages themselves. Ideally we can
> > > allocate
> > > > a
> > > > > > big
> > > > > > > > ByteBuffer (maybe kafka already does it) to receive our data
> > and
> > > > then
> > > > > > > just
> > > > > > > > get some kind of a flyweight iterator on the ByteBuffer
> > something
> > > > > like
> > > > > > > this:
> > > > > > > >
> > > > > > > > // Allocate a single ByteBuffer or have kafka allocate this
> > > > > internally.
> > > > > > > > Either way it would be very useful to just keep re-using this
> > > > buffer.
> > > > > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > > > > > // Create a coupe of flyweights.
> > > > > > > > ConsumerRecordIterator iterator = new
> ConsumerRecordIterator();
> > > > > > > > ConsumerRecord record = new ConsumerRecord();
> > > > > > > > // Subscribe to the topics we care about.
> > > > > > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > > > > > while (running) {
> > > > > > > >   // Use the buffer to get data from the server. Additionally
> > > > re-use
> > > > > > the
> > > > > > > > same iterator.
> > > > > > > >   // Resets the iterator to point to the start of the
> > ByteBuffer.
> > > > > > > >   consumer.poll(buffer, iterator, timeout);
> > > > > > > >   // Now the iterator has pointers to the ByteBuffer and is
> > > capable
> > > > > of
> > > > > > > > advancing the cursor to read every message.
> > > > > > > >   while (iterator.hasNext()) {
> > > > > > > >     // The consumer record is just a flyweight over a
> > ByteBuffer
> > > > and
> > > > > is
> > > > > > > > adjusted to point to the start of the next record.
> > > > > > > >     iterator.getNextInto(record);
> > > > > > > >     // This is not a new ByteBuffer, its just the big buffer
> > with
> > > > its
> > > > > > > > position and limit adjusted so that we only read the current
> > > > record.
> > > > > > > >     // Alternatively we could give it our own ByteBuffer, but
> > the
> > > > > point
> > > > > > > > would be to not do a copy and instead adjust the supplied
> > > > > > > >     // ByteBuffer's position and limit and pointer (through
> > > > > reflection)
> > > > > > > to
> > > > > > > > point to the right slice of the actual big ByteBuffer.
> > > > > > > >     // This means that we cannot stash this buffer in a hash
> > map
> > > or
> > > > > any
> > > > > > > > heap allocated structure since it's contents keep changing as
> > we
> > > > > > iterate.
> > > > > > > >     ByteBuffer buffer = record.getUnderlying();
> > > > > > > >     process(buffer);  // Process cannot keep a reference to
> the
> > > > > buffer
> > > > > > -
> > > > > > > > this is really the programmer's responsibility.
> > > > > > > >   }
> > > > > > > > }
> > > > > > > >
> > > > > > > > Given how the new consumer is meant to be used from a single
> > > thread
> > > > > - I
> > > > > > > > think these optional non-allocating methods will be a great
> > boon
> > > > for
> > > > > > any
> > > > > > > > one trying to save memory or prevent heap churn. It has
> > exactly 0
> > > > > > copies
> > > > > > > in
> > > > > > > > user space too which is great for performance. In our case
> > since
> > > > the
> > > > > > > > messages are very tiny we end up spending more memory in all
> > the
> > > > > > wrapper
> > > > > > > > objects than in the actual messages so this would be a game
> > > changer
> > > > > for
> > > > > > > us.
> > > > > > > > I'd love to be able to contribute if this seems sensible.
> > > Hopefully
> > > > > > these
> > > > > > > > non-allocating methods can co-exist with the allocating ones
> > and
> > > > only
> > > > > > > users
> > > > > > > > who absolutely need to use them can make the trade-off  of
> > better
> > > > > > > > efficiency/performance for a slightly more error-prone and
> ugly
> > > > API.
> > > > > > > >
> > > > > > > > Thoughts?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Rajiv
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> > > > > <altergzj@yahoo.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi all,The document is very beautiful and the Kafka release
> > > > version
> > > > > > for
> > > > > > > >> this will be? and what is the timeline?
> > > > > > > >> ThanksEdwin
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > > > > > >> rajiv@signalfuse.com> wrote:
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>  Awesome - can't wait for this version to be out!
> > > > > > > >>
> > > > > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <
> > > jay.kreps@gmail.com>
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > The timeout in the poll call is more or less the timeout
> > used
> > > by
> > > > > the
> > > > > > > >> > selector. So each call to poll will do socket activity on
> > any
> > > > > ready
> > > > > > > >> > sockets, waiting for up to that time for a socket to be
> > ready.
> > > > > There
> > > > > > > is
> > > > > > > >> no
> > > > > > > >> > longer any background threads involved in the consumer,
> all
> > > > > activity
> > > > > > > is
> > > > > > > >> > driven by the application thread(s).
> > > > > > > >> >
> > > > > > > >> > The max fetch request wait time is controlled with a
> config
> > > and
> > > > is
> > > > > > > >> > independent of the time given to poll.
> > > > > > > >> >
> > > > > > > >> > -Jay
> > > > > > > >> >
> > > > > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > > > > > rajiv@signalfuse.com>
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > I am trying to understand the semantics of the timeout
> > > > specified
> > > > > > in
> > > > > > > >> the
> > > > > > > >> > > poll method in
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > > >> > > .
> > > > > > > >> > > Is this timeout a measure of how long the fetch request
> > will
> > > > be
> > > > > > > >> parked on
> > > > > > > >> > > the broker waiting for a reply or is this more like the
> > > > timeout
> > > > > in
> > > > > > > >> > > selector.select(long timeout) i.e. the method will
> return
> > > with
> > > > > > > >> whatever
> > > > > > > >> > > data is there after waiting a maximum of timeout.
> Exposing
> > > the
> > > > > > > >> selector
> > > > > > > >> > > timeout will be very helpful for us because we want to
> > put a
> > > > > tight
> > > > > > > >> bound
> > > > > > > >> > on
> > > > > > > >> > > how long we are ready to wait on the poll call. When
> this
> > > API
> > > > is
> > > > > > > >> > available
> > > > > > > >> > > we plan to use a single thread to get data from kafka,
> > > process
> > > > > > them
> > > > > > > as
> > > > > > > >> > well
> > > > > > > >> > > as run periodic jobs. For the periodic jobs to run we
> > need a
> > > > > > > >> guarantee on
> > > > > > > >> > > how much time the poll call can take at most.
> > > > > > > >> > >
> > > > > > > >> > > Thanks!
> > > > > > > >> > >
> > > > > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > > > > > rajiv@signalfuse.com
> > > > > > > >
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Thanks!
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <
> > > jay.kreps@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > >> Err, here:
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > > >> > > >>
> > > > > > > >> > > >> -Jay
> > > > > > > >> > > >>
> > > > > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > > > > > jay.kreps@gmail.com>
> > > > > > > >> > wrote:
> > > > > > > >> > > >>
> > > > > > > >> > > >> > The current work in progress is documented here:
> > > > > > > >> > > >> >
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > > > > > >> rajiv@signalfuse.com
> > > > > > > >> > >
> > > > > > > >> > > >> > wrote:
> > > > > > > >> > > >> >
> > > > > > > >> > > >> >> Is there a link to the proposed new consumer
> > > > non-blocking
> > > > > > API?
> > > > > > > >> > > >> >>
> > > > > > > >> > > >> >> Thanks,
> > > > > > > >> > > >> >> Rajiv
> > > > > > > >> > > >> >>
> > > > > > > >> > > >> >
> > > > > > > >> > > >> >
> > > > > > > >> > > >>
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka 0.9 consumer API

Posted by Guozhang Wang <wa...@gmail.com>.
Rajiv,

Those are good points. As for implementation we have developed a class in
producer that can be probably re-used for the consumer as well.

org.apache.kafka.clients.producer.internals.BufferPool

Please feel free to add more comments on KAFKA-2045.

Guozhang


On Tue, Mar 24, 2015 at 12:21 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hi Guozhang,
>
> Yeah the main motivation is to not require de-serialization but still allow
> the consumer to de-serialize into objects if they really want to. Another
> motivation for iterating over the ByteBuffer on the fly is that we can
> prevent copies all together. This has an added implication though. Since we
> re-use the same response buffer continuously and iterate over it, this
> means that the iteration has to be done in a single threaded manner
> specifically on the thread that did the poll. The work flow will look a bit
> like this:
>
> i) Re-use the same request buffer to create a request and write to the
> socket.
> ii) On poll re-use the same response buffer to read in the request till it
> is complete.
> iii) When the response is complete respond with an iterator to the response
> ByteBuffer. The consumer must now consume the entire ByteBuffer on this
> thread since we use the a single mutable iterator to go through the
> ByteBuffer.
>
> It is tricker when we consider that during iteration the consumer might
> send more kafka requests and call poll further. We can maintain a pointer
> to the end of the response ByteBuffer to allow more responses to stream in
> and essentially use the response buffer as a circular buffer. So basically
> there would be two pointers - iteration_pointer and
>  network_append_pointer. And the iteration_pointer <=
> network_append_pointer.  If the network_append_pointer reaches the end of
> the buffer, then we cannot accept any more data without the
> iteration_pointer having progressed further. Since responses have to be
> continuous, we cannot have a response straddle the circular buffer's end.
> In this case we'd have to detect this case and copy the response to the
> beginning of the buffer so that it is continuous. Though this is a bit more
> complicated, it ensures that we don't have to move in lockstep and can
> pipeline requests/responses. I have written something like this for another
> application and since this is all happening in a single thread it is a bit
> easier and I think possible.
>
> Like Jay suggested if application level deserialization is a bottleneck
> that needs to be solved by passing slices of these ByteBuffers out to a
> pool of threads, then this approach WILL NOT work since we expect the
> ByteBuffer to be linearly iterated in one go. If we want slices to be
> passed to a pool of threads then probably copying to new ByteBuffers is the
> only good option. For my application that is definitely not the case since
> deserialization is free and the cache friendliness of iterating over a hot
> buffer trumps every other factor. But for applications with more involved
> serialization we can re-use the above framework. The more low level
> iterator (discussed in paragraph 2 and 3) can be wrapped in a higher level
> iterator that just copies the bytes for each message to a new ByteBuffer
> and hands them over. The low level iterator is still responsible for buffer
> management and the higher level iterator is just a plain old consumer that
> consumes the bytes by copying them over to a new ByteBuffer and handing
> them to the application. The application is now free to transfer these to
> other threads for processing/deserialization.
>
> Thanks for creating KAFKA-1326. Let me know what your thoughts are on this
> proposed design?
>
> Thanks,
> Rajiv
>
>
> On Tue, Mar 24, 2015 at 9:22 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Rajiv,
> >
> > Just want to clarify, that the main motivation for iterating over the
> byte
> > buffer directly instead of iterating over the records is for not
> enforcing
> > de-serialization, right? I think that can be done by passing the
> > deserializer class info into the consumer record instead of putting the
> > de-serialized objects into the consumer, and only do de-serialization
> > on-the-fly when key/value fields are requested while exposing another API
> > to return the underlying raw bytes from the record. What do you think?
> >
> > As for de-compression, today we allocate new buffer for each
> de-compressed
> > message, and it may be required to do de-compression with re-useable
> buffer
> > with memory control. I will create a ticket under KAFKA-1326 for that.
> >
> > Guozhang
> >
> > Guozhang
> >
> >
> >
> > On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks for the note. So if we do not deserialize till the last moment
> > like
> > > Jay suggested we would not need extra buffers for deserialization.
> Unless
> > > we need random access to messages it seems like we can deserialize
> right
> > at
> > > the time of iteration and allocate objects only if the Consumer
> actually
> > > requires deserialized objects. Some applications like mine can just
> > utilize
> > > the ByteBuffer contents and in those cases we can take in a ByteBuffer
> > or a
> > > ConsumerRecord which points to a ByteBuffer and make it point to the
> > right
> > > slice of our response buffer. This ByteBuffer can be re-used over and
> > over
> > > again.
> > >
> > > As for decompression - that will require an extra buffer allocated at
> > > startup, but not new allocations per response. I imagine the steps
> would
> > be
> > > something like this:
> > >
> > > i) Receive entire response which includes possibly compressed messages
> on
> > > ResponseBuffer. ResponseBuffer is only allocated once.
> > > ii) Once we have received an entire response, return an iterator to the
> > > underlying ByteBuffer.
> > > iii) On each message set check if it is compressed or not.
> > > iv) If compressed decompress only that message set in a streaming
> manner
> > > and stores the result in another preallocated buffer
> > > DecompressedMessagesBuffer. When the consumer asks for the next message
> > > alter the flyweight that it supplies you to point to the correct slice
> in
> > > the DecompressedMessagesBuffer. Do this till all decompressed messages
> > from
> > > that message set are consumed.
> > > v) If the message set was not compressed, then alter the flyweight to
> > point
> > > to the correct slice in the ResponseBuffer. Do this till all messages
> > from
> > > this message set are consumed.
> > >
> > > So basically we will switch between the two buffers when messages are
> > > compressed or otherwise stay only on the ResponseBuffer. With the lazy
> > > approach another nice thing is we can do CRC validation right before
> the
> > > message gets consumed too. Which means that the CRC algorithm will scan
> > > some bytes which will be hot in cache (since we are iterating a linear
> > > array of bytes) and as soon as we have checked the bytes they will be
> > > consumed by the application which means they will stay hot in L1 cache.
> > >
> > > All of this "theory crafting" is based on my currently incomplete
> > > understanding of the kafka protocol but it seems like compression is
> per
> > > message set so we can stream though. Also since in general we can
> iterate
> > > through the response buffer, we can do CRC validation right before the
> > > message is consumed.
> > >
> > > Thanks,
> > > Rajiv
> > >
> > > On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Rajiv,
> > > >
> > > > A side note for re-using ByteBuffer: in the new consumer we do plan
> to
> > > add
> > > > some memory management module such that it will try to reuse
> allocated
> > > > buffer for fetch responses. But as Jay mentioned, for now inside the
> > > poll()
> > > > call de-serialization and de-compression is done which requires to
> > > allocate
> > > > another buffer to write the de-serialized and de-compressed bytes,
> > hence
> > > > even with fetch response buffer management today we still need to
> > > allocate
> > > > new buffers if compressed messages are delivered.
> > > >
> > > > Guozhang
> > > >
> > > > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >
> > > > > Zijing, the new consumer will be in the next release. We don't
> have a
> > > > hard
> > > > > date for this yet.
> > > > >
> > > > > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > > > > certainly could be an improvement, but it might also be that the
> CRC
> > > > > validation and compression dominate.
> > > > >
> > > > > The first step would be
> > > > >   https://issues.apache.org/jira/browse/KAFKA-1895
> > > > >
> > > > > This would delay the deserialization of ConsumerRecords and make it
> > > > > basically a wrapper around the actual MemoryRecords chunks which
> are
> > > > > basically ByteBuffer instances. We could then add an optional
> > > > > ConsumerRecords param in poll, to allow you to hand back your
> > > > > ConsumerRecords instance. Optimizing the ConsumerRecord instance
> > reuse
> > > > > would also be possible.
> > > > >
> > > > > That JIRA is actually useful irrespective of these optimizations
> > though
> > > > > because deferring the deserialization and decompression would allow
> > you
> > > > to
> > > > > punt that work into a pool of processor threads. Since it is more
> > > common
> > > > to
> > > > > see the real bottleneck be application serialization this could be
> > > > > valuable.
> > > > >
> > > > > WRT the object reuse I wouldn't be shocked to learn that you
> actually
> > > get
> > > > > equivalent stuff out of the jvm allocator's own pooling and/or
> escape
> > > > > analysis once we are doing the allocation on demand. So it would be
> > > good
> > > > to
> > > > > show a real performance improvement on the newer JVMs before
> deciding
> > > to
> > > > go
> > > > > this route.
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <
> rajiv@signalfuse.com>
> > > > > wrote:
> > > > >
> > > > > > Just a follow up - I have implemented a pretty hacky prototype
> It's
> > > too
> > > > > > unclean to share right now but I can clean it up if you are
> > > > interested. I
> > > > > > don't think it offers anything that people already don't know
> about
> > > > > though.
> > > > > >
> > > > > > My prototype doesn't do any metadata requests yet but I have a
> > > > flyweight
> > > > > > builder/parser of the FetchRequest and the FetchResponse protocol
> > > that
> > > > I
> > > > > > based on the protocol page at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > > > .
> > > > > > On my consumer thread I allocate two buffers per broker a small
> > > request
> > > > > > buffer and a bigger response buffer. My assumption is that the
> > > response
> > > > > > buffer will be big enough that it is never too small for the
> > > response.
> > > > > If
> > > > > > this assumption doesn't hold then we would have to reallocate a
> > > bigger
> > > > > > buffer. These are the steps I am following right now:
> > > > > >
> > > > > > i) Whenever a client makes a request I fill up the relevant
> request
> > > > > > buffer(s) using my flyweight object instead of allocating an
> object
> > > and
> > > > > > then serializing it. I then write this buffer to the
> SocketChannel
> > > > > > connected to the right Kafka broker. I don't yet handle the case
> > > where
> > > > > all
> > > > > > of the request bytes could not be  written synchronously. If I
> were
> > > to
> > > > > > handle that properly I would need to register WRITE interest on
> the
> > > > > > SocketChannel on incomplete writes.
> > > > > >
> > > > > > ii) Whenever the client calls poll() I register read interest on
> > the
> > > > > socket
> > > > > > channel using the selector and make a select call on the
> underlying
> > > > > > selector. If the selector says that the socket channel is
> > readable, I
> > > > > read
> > > > > > all the bytes possible into my response ByteBuffer. If I can read
> > the
> > > > > first
> > > > > > 4 bytes I know how big the response is. So I wait till all
> response
> > > > bytes
> > > > > > have been read before attempting to parse it. This might take
> > > multiple
> > > > > poll
> > > > > > calls. Once I have received the expected number of bytes I
> iterate
> > > > > through
> > > > > > the response ByteBuffer. There is no random access access
> provided
> > to
> > > > the
> > > > > > ByteBuffer since there are way too many variable length fields.
> > This
> > > a
> > > > > > DISADVANTAGE of doing flyweight style parsing instead of
> > > > deserialization
> > > > > > into POJOs with indexed data structures like maps.  I could build
> > > > > indexing
> > > > > > with a non-allocating primitive hashmap like the koloboke one
> > mapping
> > > > > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> > > > required.
> > > > > > For me the lack of random access has not been a problem at all.
> > > > Iteration
> > > > > > is done just by providing a ByteBuffer and offset (within
> > ByteBuffer)
> > > > > pair
> > > > > > for each message. In my own application I wrap this ByteBuffer
> and
> > > > offset
> > > > > > pair in my own flyweight which knows how to decode the data.
> > > > > >
> > > > > > iii) Once an entire response has been iterated through I can
> re-use
> > > > both
> > > > > > the request as well as response buffers.
> > > > > >
> > > > > > I am sure this can be improved upon a lot.  I allocate the
> > following
> > > > > before
> > > > > > starting my client:
> > > > > >     i) A Direct ByteBuffer for requests.
> > > > > >    ii) A Direct ByteBuffer for responses. Sizes of the
> ByteBuffers
> > > are
> > > > > > chosen so that they can fit the biggest request/responses we
> > expect.
> > > > > >    ii) A flyweight that wraps the Request ByteBuffer and can be
> > used
> > > to
> > > > > > write a particular kind of request.  So far I have only written a
> > > > > flyweight
> > > > > > for a FetchRequest.
> > > > > >   iii) A flyweight that wraps the Response ByteBuffer and can be
> > used
> > > > to
> > > > > > iterate through the entire response including finding errors.
> There
> > > is
> > > > no
> > > > > > random access allowed right now. So far I have only written a
> > > flyweight
> > > > > > parser for a FetchResponse.
> > > > > >   iv) A flyweight for every type of application level message
> that
> > I
> > > > > > expect. Iterating through the response ByteBuffer using the
> > flyweight
> > > > in
> > > > > > (iii) I get offsets into the ByteBuffer for each individual
> > message.
> > > My
> > > > > own
> > > > > > messages work by using absolute position getLong(position),
> > > > > > getShort(position) etc calls on a ByteBuffer, so this works out
> > great
> > > > for
> > > > > > me. We could alternatively provide an API that does allocates a
> new
> > > > > > ByteBuffer and copies the data for people who don't want the zero
> > > > > > allocation access.
> > > > > >
> > > > > > Sadly in my profiling I noticed that selector implementation in
> the
> > > > > > JDK allocates
> > > > > > but it seems like projects like Netty, Aeron have worked around
> > this
> > > by
> > > > > > using reflection to replace the underlying implementation to make
> > it
> > > > > > non-allocating. Other than that I have absolutely zero
> allocations
> > > past
> > > > > the
> > > > > > initial 4-5 allocations. I also have absolutely zero copies in
> user
> > > > space
> > > > > > once the data lands from the socket onto the ByteBuffer.
> > > > > >
> > > > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <
> > rajiv@signalfuse.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I had a few more thoughts on the new API. Currently we use
> kafka
> > to
> > > > > > > transfer really compact messages - around 25-35 bytes each. Our
> > use
> > > > > case
> > > > > > is
> > > > > > > a lot of messages but each very small. Will it be possible to
> do
> > > the
> > > > > > > following
> > > > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We
> > > employ
> > > > > our
> > > > > > > own binary encoding to encode our messages and it is encoded
> as a
> > > > > > flyweight
> > > > > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > > > > decoding/deserialization step. Currently we use the
> > SimpleConsumer
> > > > and
> > > > > > have
> > > > > > > found the fact that it hands out ByteBuffers very useful
> instead
> > > of a
> > > > > > > mandatory deserialization into a POJO step. Even then through
> > > memory
> > > > > > > profiling we have found out that the ByteBuffers and the
> records
> > > take
> > > > > > more
> > > > > > > space than the actual messages themselves. Ideally we can
> > allocate
> > > a
> > > > > big
> > > > > > > ByteBuffer (maybe kafka already does it) to receive our data
> and
> > > then
> > > > > > just
> > > > > > > get some kind of a flyweight iterator on the ByteBuffer
> something
> > > > like
> > > > > > this:
> > > > > > >
> > > > > > > // Allocate a single ByteBuffer or have kafka allocate this
> > > > internally.
> > > > > > > Either way it would be very useful to just keep re-using this
> > > buffer.
> > > > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > > > > // Create a coupe of flyweights.
> > > > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > > > > ConsumerRecord record = new ConsumerRecord();
> > > > > > > // Subscribe to the topics we care about.
> > > > > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > > > > while (running) {
> > > > > > >   // Use the buffer to get data from the server. Additionally
> > > re-use
> > > > > the
> > > > > > > same iterator.
> > > > > > >   // Resets the iterator to point to the start of the
> ByteBuffer.
> > > > > > >   consumer.poll(buffer, iterator, timeout);
> > > > > > >   // Now the iterator has pointers to the ByteBuffer and is
> > capable
> > > > of
> > > > > > > advancing the cursor to read every message.
> > > > > > >   while (iterator.hasNext()) {
> > > > > > >     // The consumer record is just a flyweight over a
> ByteBuffer
> > > and
> > > > is
> > > > > > > adjusted to point to the start of the next record.
> > > > > > >     iterator.getNextInto(record);
> > > > > > >     // This is not a new ByteBuffer, its just the big buffer
> with
> > > its
> > > > > > > position and limit adjusted so that we only read the current
> > > record.
> > > > > > >     // Alternatively we could give it our own ByteBuffer, but
> the
> > > > point
> > > > > > > would be to not do a copy and instead adjust the supplied
> > > > > > >     // ByteBuffer's position and limit and pointer (through
> > > > reflection)
> > > > > > to
> > > > > > > point to the right slice of the actual big ByteBuffer.
> > > > > > >     // This means that we cannot stash this buffer in a hash
> map
> > or
> > > > any
> > > > > > > heap allocated structure since it's contents keep changing as
> we
> > > > > iterate.
> > > > > > >     ByteBuffer buffer = record.getUnderlying();
> > > > > > >     process(buffer);  // Process cannot keep a reference to the
> > > > buffer
> > > > > -
> > > > > > > this is really the programmer's responsibility.
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > > Given how the new consumer is meant to be used from a single
> > thread
> > > > - I
> > > > > > > think these optional non-allocating methods will be a great
> boon
> > > for
> > > > > any
> > > > > > > one trying to save memory or prevent heap churn. It has
> exactly 0
> > > > > copies
> > > > > > in
> > > > > > > user space too which is great for performance. In our case
> since
> > > the
> > > > > > > messages are very tiny we end up spending more memory in all
> the
> > > > > wrapper
> > > > > > > objects than in the actual messages so this would be a game
> > changer
> > > > for
> > > > > > us.
> > > > > > > I'd love to be able to contribute if this seems sensible.
> > Hopefully
> > > > > these
> > > > > > > non-allocating methods can co-exist with the allocating ones
> and
> > > only
> > > > > > users
> > > > > > > who absolutely need to use them can make the trade-off  of
> better
> > > > > > > efficiency/performance for a slightly more error-prone and ugly
> > > API.
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Rajiv
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> > > > <altergzj@yahoo.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,The document is very beautiful and the Kafka release
> > > version
> > > > > for
> > > > > > >> this will be? and what is the timeline?
> > > > > > >> ThanksEdwin
> > > > > > >>
> > > > > > >>
> > > > > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > > > > >> rajiv@signalfuse.com> wrote:
> > > > > > >>
> > > > > > >>
> > > > > > >>  Awesome - can't wait for this version to be out!
> > > > > > >>
> > > > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <
> > jay.kreps@gmail.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > The timeout in the poll call is more or less the timeout
> used
> > by
> > > > the
> > > > > > >> > selector. So each call to poll will do socket activity on
> any
> > > > ready
> > > > > > >> > sockets, waiting for up to that time for a socket to be
> ready.
> > > > There
> > > > > > is
> > > > > > >> no
> > > > > > >> > longer any background threads involved in the consumer, all
> > > > activity
> > > > > > is
> > > > > > >> > driven by the application thread(s).
> > > > > > >> >
> > > > > > >> > The max fetch request wait time is controlled with a config
> > and
> > > is
> > > > > > >> > independent of the time given to poll.
> > > > > > >> >
> > > > > > >> > -Jay
> > > > > > >> >
> > > > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > > > > rajiv@signalfuse.com>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > I am trying to understand the semantics of the timeout
> > > specified
> > > > > in
> > > > > > >> the
> > > > > > >> > > poll method in
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > >> > > .
> > > > > > >> > > Is this timeout a measure of how long the fetch request
> will
> > > be
> > > > > > >> parked on
> > > > > > >> > > the broker waiting for a reply or is this more like the
> > > timeout
> > > > in
> > > > > > >> > > selector.select(long timeout) i.e. the method will return
> > with
> > > > > > >> whatever
> > > > > > >> > > data is there after waiting a maximum of timeout. Exposing
> > the
> > > > > > >> selector
> > > > > > >> > > timeout will be very helpful for us because we want to
> put a
> > > > tight
> > > > > > >> bound
> > > > > > >> > on
> > > > > > >> > > how long we are ready to wait on the poll call. When this
> > API
> > > is
> > > > > > >> > available
> > > > > > >> > > we plan to use a single thread to get data from kafka,
> > process
> > > > > them
> > > > > > as
> > > > > > >> > well
> > > > > > >> > > as run periodic jobs. For the periodic jobs to run we
> need a
> > > > > > >> guarantee on
> > > > > > >> > > how much time the poll call can take at most.
> > > > > > >> > >
> > > > > > >> > > Thanks!
> > > > > > >> > >
> > > > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > > > > rajiv@signalfuse.com
> > > > > > >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Thanks!
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <
> > jay.kreps@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > >> Err, here:
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > >> > > >>
> > > > > > >> > > >> -Jay
> > > > > > >> > > >>
> > > > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > > > > jay.kreps@gmail.com>
> > > > > > >> > wrote:
> > > > > > >> > > >>
> > > > > > >> > > >> > The current work in progress is documented here:
> > > > > > >> > > >> >
> > > > > > >> > > >> >
> > > > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > > > > >> rajiv@signalfuse.com
> > > > > > >> > >
> > > > > > >> > > >> > wrote:
> > > > > > >> > > >> >
> > > > > > >> > > >> >> Is there a link to the proposed new consumer
> > > non-blocking
> > > > > API?
> > > > > > >> > > >> >>
> > > > > > >> > > >> >> Thanks,
> > > > > > >> > > >> >> Rajiv
> > > > > > >> > > >> >>
> > > > > > >> > > >> >
> > > > > > >> > > >> >
> > > > > > >> > > >>
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Hi Guozhang,

Yeah the main motivation is to not require de-serialization but still allow
the consumer to de-serialize into objects if they really want to. Another
motivation for iterating over the ByteBuffer on the fly is that we can
prevent copies all together. This has an added implication though. Since we
re-use the same response buffer continuously and iterate over it, this
means that the iteration has to be done in a single threaded manner
specifically on the thread that did the poll. The work flow will look a bit
like this:

i) Re-use the same request buffer to create a request and write to the
socket.
ii) On poll re-use the same response buffer to read in the request till it
is complete.
iii) When the response is complete respond with an iterator to the response
ByteBuffer. The consumer must now consume the entire ByteBuffer on this
thread since we use the a single mutable iterator to go through the
ByteBuffer.

It is tricker when we consider that during iteration the consumer might
send more kafka requests and call poll further. We can maintain a pointer
to the end of the response ByteBuffer to allow more responses to stream in
and essentially use the response buffer as a circular buffer. So basically
there would be two pointers - iteration_pointer and
 network_append_pointer. And the iteration_pointer <=
network_append_pointer.  If the network_append_pointer reaches the end of
the buffer, then we cannot accept any more data without the
iteration_pointer having progressed further. Since responses have to be
continuous, we cannot have a response straddle the circular buffer's end.
In this case we'd have to detect this case and copy the response to the
beginning of the buffer so that it is continuous. Though this is a bit more
complicated, it ensures that we don't have to move in lockstep and can
pipeline requests/responses. I have written something like this for another
application and since this is all happening in a single thread it is a bit
easier and I think possible.

Like Jay suggested if application level deserialization is a bottleneck
that needs to be solved by passing slices of these ByteBuffers out to a
pool of threads, then this approach WILL NOT work since we expect the
ByteBuffer to be linearly iterated in one go. If we want slices to be
passed to a pool of threads then probably copying to new ByteBuffers is the
only good option. For my application that is definitely not the case since
deserialization is free and the cache friendliness of iterating over a hot
buffer trumps every other factor. But for applications with more involved
serialization we can re-use the above framework. The more low level
iterator (discussed in paragraph 2 and 3) can be wrapped in a higher level
iterator that just copies the bytes for each message to a new ByteBuffer
and hands them over. The low level iterator is still responsible for buffer
management and the higher level iterator is just a plain old consumer that
consumes the bytes by copying them over to a new ByteBuffer and handing
them to the application. The application is now free to transfer these to
other threads for processing/deserialization.

Thanks for creating KAFKA-1326. Let me know what your thoughts are on this
proposed design?

Thanks,
Rajiv


On Tue, Mar 24, 2015 at 9:22 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Rajiv,
>
> Just want to clarify, that the main motivation for iterating over the byte
> buffer directly instead of iterating over the records is for not enforcing
> de-serialization, right? I think that can be done by passing the
> deserializer class info into the consumer record instead of putting the
> de-serialized objects into the consumer, and only do de-serialization
> on-the-fly when key/value fields are requested while exposing another API
> to return the underlying raw bytes from the record. What do you think?
>
> As for de-compression, today we allocate new buffer for each de-compressed
> message, and it may be required to do de-compression with re-useable buffer
> with memory control. I will create a ticket under KAFKA-1326 for that.
>
> Guozhang
>
> Guozhang
>
>
>
> On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for the note. So if we do not deserialize till the last moment
> like
> > Jay suggested we would not need extra buffers for deserialization. Unless
> > we need random access to messages it seems like we can deserialize right
> at
> > the time of iteration and allocate objects only if the Consumer actually
> > requires deserialized objects. Some applications like mine can just
> utilize
> > the ByteBuffer contents and in those cases we can take in a ByteBuffer
> or a
> > ConsumerRecord which points to a ByteBuffer and make it point to the
> right
> > slice of our response buffer. This ByteBuffer can be re-used over and
> over
> > again.
> >
> > As for decompression - that will require an extra buffer allocated at
> > startup, but not new allocations per response. I imagine the steps would
> be
> > something like this:
> >
> > i) Receive entire response which includes possibly compressed messages on
> > ResponseBuffer. ResponseBuffer is only allocated once.
> > ii) Once we have received an entire response, return an iterator to the
> > underlying ByteBuffer.
> > iii) On each message set check if it is compressed or not.
> > iv) If compressed decompress only that message set in a streaming manner
> > and stores the result in another preallocated buffer
> > DecompressedMessagesBuffer. When the consumer asks for the next message
> > alter the flyweight that it supplies you to point to the correct slice in
> > the DecompressedMessagesBuffer. Do this till all decompressed messages
> from
> > that message set are consumed.
> > v) If the message set was not compressed, then alter the flyweight to
> point
> > to the correct slice in the ResponseBuffer. Do this till all messages
> from
> > this message set are consumed.
> >
> > So basically we will switch between the two buffers when messages are
> > compressed or otherwise stay only on the ResponseBuffer. With the lazy
> > approach another nice thing is we can do CRC validation right before the
> > message gets consumed too. Which means that the CRC algorithm will scan
> > some bytes which will be hot in cache (since we are iterating a linear
> > array of bytes) and as soon as we have checked the bytes they will be
> > consumed by the application which means they will stay hot in L1 cache.
> >
> > All of this "theory crafting" is based on my currently incomplete
> > understanding of the kafka protocol but it seems like compression is per
> > message set so we can stream though. Also since in general we can iterate
> > through the response buffer, we can do CRC validation right before the
> > message is consumed.
> >
> > Thanks,
> > Rajiv
> >
> > On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Rajiv,
> > >
> > > A side note for re-using ByteBuffer: in the new consumer we do plan to
> > add
> > > some memory management module such that it will try to reuse allocated
> > > buffer for fetch responses. But as Jay mentioned, for now inside the
> > poll()
> > > call de-serialization and de-compression is done which requires to
> > allocate
> > > another buffer to write the de-serialized and de-compressed bytes,
> hence
> > > even with fetch response buffer management today we still need to
> > allocate
> > > new buffers if compressed messages are delivered.
> > >
> > > Guozhang
> > >
> > > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >
> > > > Zijing, the new consumer will be in the next release. We don't have a
> > > hard
> > > > date for this yet.
> > > >
> > > > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > > > certainly could be an improvement, but it might also be that the CRC
> > > > validation and compression dominate.
> > > >
> > > > The first step would be
> > > >   https://issues.apache.org/jira/browse/KAFKA-1895
> > > >
> > > > This would delay the deserialization of ConsumerRecords and make it
> > > > basically a wrapper around the actual MemoryRecords chunks which are
> > > > basically ByteBuffer instances. We could then add an optional
> > > > ConsumerRecords param in poll, to allow you to hand back your
> > > > ConsumerRecords instance. Optimizing the ConsumerRecord instance
> reuse
> > > > would also be possible.
> > > >
> > > > That JIRA is actually useful irrespective of these optimizations
> though
> > > > because deferring the deserialization and decompression would allow
> you
> > > to
> > > > punt that work into a pool of processor threads. Since it is more
> > common
> > > to
> > > > see the real bottleneck be application serialization this could be
> > > > valuable.
> > > >
> > > > WRT the object reuse I wouldn't be shocked to learn that you actually
> > get
> > > > equivalent stuff out of the jvm allocator's own pooling and/or escape
> > > > analysis once we are doing the allocation on demand. So it would be
> > good
> > > to
> > > > show a real performance improvement on the newer JVMs before deciding
> > to
> > > go
> > > > this route.
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > > wrote:
> > > >
> > > > > Just a follow up - I have implemented a pretty hacky prototype It's
> > too
> > > > > unclean to share right now but I can clean it up if you are
> > > interested. I
> > > > > don't think it offers anything that people already don't know about
> > > > though.
> > > > >
> > > > > My prototype doesn't do any metadata requests yet but I have a
> > > flyweight
> > > > > builder/parser of the FetchRequest and the FetchResponse protocol
> > that
> > > I
> > > > > based on the protocol page at
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > > .
> > > > > On my consumer thread I allocate two buffers per broker a small
> > request
> > > > > buffer and a bigger response buffer. My assumption is that the
> > response
> > > > > buffer will be big enough that it is never too small for the
> > response.
> > > > If
> > > > > this assumption doesn't hold then we would have to reallocate a
> > bigger
> > > > > buffer. These are the steps I am following right now:
> > > > >
> > > > > i) Whenever a client makes a request I fill up the relevant request
> > > > > buffer(s) using my flyweight object instead of allocating an object
> > and
> > > > > then serializing it. I then write this buffer to the SocketChannel
> > > > > connected to the right Kafka broker. I don't yet handle the case
> > where
> > > > all
> > > > > of the request bytes could not be  written synchronously. If I were
> > to
> > > > > handle that properly I would need to register WRITE interest on the
> > > > > SocketChannel on incomplete writes.
> > > > >
> > > > > ii) Whenever the client calls poll() I register read interest on
> the
> > > > socket
> > > > > channel using the selector and make a select call on the underlying
> > > > > selector. If the selector says that the socket channel is
> readable, I
> > > > read
> > > > > all the bytes possible into my response ByteBuffer. If I can read
> the
> > > > first
> > > > > 4 bytes I know how big the response is. So I wait till all response
> > > bytes
> > > > > have been read before attempting to parse it. This might take
> > multiple
> > > > poll
> > > > > calls. Once I have received the expected number of bytes I iterate
> > > > through
> > > > > the response ByteBuffer. There is no random access access provided
> to
> > > the
> > > > > ByteBuffer since there are way too many variable length fields.
> This
> > a
> > > > > DISADVANTAGE of doing flyweight style parsing instead of
> > > deserialization
> > > > > into POJOs with indexed data structures like maps.  I could build
> > > > indexing
> > > > > with a non-allocating primitive hashmap like the koloboke one
> mapping
> > > > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> > > required.
> > > > > For me the lack of random access has not been a problem at all.
> > > Iteration
> > > > > is done just by providing a ByteBuffer and offset (within
> ByteBuffer)
> > > > pair
> > > > > for each message. In my own application I wrap this ByteBuffer and
> > > offset
> > > > > pair in my own flyweight which knows how to decode the data.
> > > > >
> > > > > iii) Once an entire response has been iterated through I can re-use
> > > both
> > > > > the request as well as response buffers.
> > > > >
> > > > > I am sure this can be improved upon a lot.  I allocate the
> following
> > > > before
> > > > > starting my client:
> > > > >     i) A Direct ByteBuffer for requests.
> > > > >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers
> > are
> > > > > chosen so that they can fit the biggest request/responses we
> expect.
> > > > >    ii) A flyweight that wraps the Request ByteBuffer and can be
> used
> > to
> > > > > write a particular kind of request.  So far I have only written a
> > > > flyweight
> > > > > for a FetchRequest.
> > > > >   iii) A flyweight that wraps the Response ByteBuffer and can be
> used
> > > to
> > > > > iterate through the entire response including finding errors. There
> > is
> > > no
> > > > > random access allowed right now. So far I have only written a
> > flyweight
> > > > > parser for a FetchResponse.
> > > > >   iv) A flyweight for every type of application level message that
> I
> > > > > expect. Iterating through the response ByteBuffer using the
> flyweight
> > > in
> > > > > (iii) I get offsets into the ByteBuffer for each individual
> message.
> > My
> > > > own
> > > > > messages work by using absolute position getLong(position),
> > > > > getShort(position) etc calls on a ByteBuffer, so this works out
> great
> > > for
> > > > > me. We could alternatively provide an API that does allocates a new
> > > > > ByteBuffer and copies the data for people who don't want the zero
> > > > > allocation access.
> > > > >
> > > > > Sadly in my profiling I noticed that selector implementation in the
> > > > > JDK allocates
> > > > > but it seems like projects like Netty, Aeron have worked around
> this
> > by
> > > > > using reflection to replace the underlying implementation to make
> it
> > > > > non-allocating. Other than that I have absolutely zero allocations
> > past
> > > > the
> > > > > initial 4-5 allocations. I also have absolutely zero copies in user
> > > space
> > > > > once the data lands from the socket onto the ByteBuffer.
> > > > >
> > > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <
> rajiv@signalfuse.com
> > >
> > > > > wrote:
> > > > >
> > > > > > I had a few more thoughts on the new API. Currently we use kafka
> to
> > > > > > transfer really compact messages - around 25-35 bytes each. Our
> use
> > > > case
> > > > > is
> > > > > > a lot of messages but each very small. Will it be possible to do
> > the
> > > > > > following
> > > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We
> > employ
> > > > our
> > > > > > own binary encoding to encode our messages and it is encoded as a
> > > > > flyweight
> > > > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > > > decoding/deserialization step. Currently we use the
> SimpleConsumer
> > > and
> > > > > have
> > > > > > found the fact that it hands out ByteBuffers very useful instead
> > of a
> > > > > > mandatory deserialization into a POJO step. Even then through
> > memory
> > > > > > profiling we have found out that the ByteBuffers and the records
> > take
> > > > > more
> > > > > > space than the actual messages themselves. Ideally we can
> allocate
> > a
> > > > big
> > > > > > ByteBuffer (maybe kafka already does it) to receive our data and
> > then
> > > > > just
> > > > > > get some kind of a flyweight iterator on the ByteBuffer something
> > > like
> > > > > this:
> > > > > >
> > > > > > // Allocate a single ByteBuffer or have kafka allocate this
> > > internally.
> > > > > > Either way it would be very useful to just keep re-using this
> > buffer.
> > > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > > > // Create a coupe of flyweights.
> > > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > > > ConsumerRecord record = new ConsumerRecord();
> > > > > > // Subscribe to the topics we care about.
> > > > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > > > while (running) {
> > > > > >   // Use the buffer to get data from the server. Additionally
> > re-use
> > > > the
> > > > > > same iterator.
> > > > > >   // Resets the iterator to point to the start of the ByteBuffer.
> > > > > >   consumer.poll(buffer, iterator, timeout);
> > > > > >   // Now the iterator has pointers to the ByteBuffer and is
> capable
> > > of
> > > > > > advancing the cursor to read every message.
> > > > > >   while (iterator.hasNext()) {
> > > > > >     // The consumer record is just a flyweight over a ByteBuffer
> > and
> > > is
> > > > > > adjusted to point to the start of the next record.
> > > > > >     iterator.getNextInto(record);
> > > > > >     // This is not a new ByteBuffer, its just the big buffer with
> > its
> > > > > > position and limit adjusted so that we only read the current
> > record.
> > > > > >     // Alternatively we could give it our own ByteBuffer, but the
> > > point
> > > > > > would be to not do a copy and instead adjust the supplied
> > > > > >     // ByteBuffer's position and limit and pointer (through
> > > reflection)
> > > > > to
> > > > > > point to the right slice of the actual big ByteBuffer.
> > > > > >     // This means that we cannot stash this buffer in a hash map
> or
> > > any
> > > > > > heap allocated structure since it's contents keep changing as we
> > > > iterate.
> > > > > >     ByteBuffer buffer = record.getUnderlying();
> > > > > >     process(buffer);  // Process cannot keep a reference to the
> > > buffer
> > > > -
> > > > > > this is really the programmer's responsibility.
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > > Given how the new consumer is meant to be used from a single
> thread
> > > - I
> > > > > > think these optional non-allocating methods will be a great boon
> > for
> > > > any
> > > > > > one trying to save memory or prevent heap churn. It has exactly 0
> > > > copies
> > > > > in
> > > > > > user space too which is great for performance. In our case since
> > the
> > > > > > messages are very tiny we end up spending more memory in all the
> > > > wrapper
> > > > > > objects than in the actual messages so this would be a game
> changer
> > > for
> > > > > us.
> > > > > > I'd love to be able to contribute if this seems sensible.
> Hopefully
> > > > these
> > > > > > non-allocating methods can co-exist with the allocating ones and
> > only
> > > > > users
> > > > > > who absolutely need to use them can make the trade-off  of better
> > > > > > efficiency/performance for a slightly more error-prone and ugly
> > API.
> > > > > >
> > > > > > Thoughts?
> > > > > >
> > > > > > Thanks,
> > > > > > Rajiv
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> > > <altergzj@yahoo.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi all,The document is very beautiful and the Kafka release
> > version
> > > > for
> > > > > >> this will be? and what is the timeline?
> > > > > >> ThanksEdwin
> > > > > >>
> > > > > >>
> > > > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > > > >> rajiv@signalfuse.com> wrote:
> > > > > >>
> > > > > >>
> > > > > >>  Awesome - can't wait for this version to be out!
> > > > > >>
> > > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <
> jay.kreps@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > The timeout in the poll call is more or less the timeout used
> by
> > > the
> > > > > >> > selector. So each call to poll will do socket activity on any
> > > ready
> > > > > >> > sockets, waiting for up to that time for a socket to be ready.
> > > There
> > > > > is
> > > > > >> no
> > > > > >> > longer any background threads involved in the consumer, all
> > > activity
> > > > > is
> > > > > >> > driven by the application thread(s).
> > > > > >> >
> > > > > >> > The max fetch request wait time is controlled with a config
> and
> > is
> > > > > >> > independent of the time given to poll.
> > > > > >> >
> > > > > >> > -Jay
> > > > > >> >
> > > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > > > rajiv@signalfuse.com>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > > I am trying to understand the semantics of the timeout
> > specified
> > > > in
> > > > > >> the
> > > > > >> > > poll method in
> > > > > >> > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > >> > > .
> > > > > >> > > Is this timeout a measure of how long the fetch request will
> > be
> > > > > >> parked on
> > > > > >> > > the broker waiting for a reply or is this more like the
> > timeout
> > > in
> > > > > >> > > selector.select(long timeout) i.e. the method will return
> with
> > > > > >> whatever
> > > > > >> > > data is there after waiting a maximum of timeout. Exposing
> the
> > > > > >> selector
> > > > > >> > > timeout will be very helpful for us because we want to put a
> > > tight
> > > > > >> bound
> > > > > >> > on
> > > > > >> > > how long we are ready to wait on the poll call. When this
> API
> > is
> > > > > >> > available
> > > > > >> > > we plan to use a single thread to get data from kafka,
> process
> > > > them
> > > > > as
> > > > > >> > well
> > > > > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > > > > >> guarantee on
> > > > > >> > > how much time the poll call can take at most.
> > > > > >> > >
> > > > > >> > > Thanks!
> > > > > >> > >
> > > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > > > rajiv@signalfuse.com
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Thanks!
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <
> jay.kreps@gmail.com
> > >
> > > > > wrote:
> > > > > >> > > >
> > > > > >> > > >> Err, here:
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > >> > > >>
> > > > > >> > > >> -Jay
> > > > > >> > > >>
> > > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > > > jay.kreps@gmail.com>
> > > > > >> > wrote:
> > > > > >> > > >>
> > > > > >> > > >> > The current work in progress is documented here:
> > > > > >> > > >> >
> > > > > >> > > >> >
> > > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > > > >> rajiv@signalfuse.com
> > > > > >> > >
> > > > > >> > > >> > wrote:
> > > > > >> > > >> >
> > > > > >> > > >> >> Is there a link to the proposed new consumer
> > non-blocking
> > > > API?
> > > > > >> > > >> >>
> > > > > >> > > >> >> Thanks,
> > > > > >> > > >> >> Rajiv
> > > > > >> > > >> >>
> > > > > >> > > >> >
> > > > > >> > > >> >
> > > > > >> > > >>
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka 0.9 consumer API

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Rajiv,

Just want to clarify, that the main motivation for iterating over the byte
buffer directly instead of iterating over the records is for not enforcing
de-serialization, right? I think that can be done by passing the
deserializer class info into the consumer record instead of putting the
de-serialized objects into the consumer, and only do de-serialization
on-the-fly when key/value fields are requested while exposing another API
to return the underlying raw bytes from the record. What do you think?

As for de-compression, today we allocate new buffer for each de-compressed
message, and it may be required to do de-compression with re-useable buffer
with memory control. I will create a ticket under KAFKA-1326 for that.

Guozhang

Guozhang



On Sun, Mar 22, 2015 at 1:22 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hi Guozhang,
>
> Thanks for the note. So if we do not deserialize till the last moment like
> Jay suggested we would not need extra buffers for deserialization. Unless
> we need random access to messages it seems like we can deserialize right at
> the time of iteration and allocate objects only if the Consumer actually
> requires deserialized objects. Some applications like mine can just utilize
> the ByteBuffer contents and in those cases we can take in a ByteBuffer or a
> ConsumerRecord which points to a ByteBuffer and make it point to the right
> slice of our response buffer. This ByteBuffer can be re-used over and over
> again.
>
> As for decompression - that will require an extra buffer allocated at
> startup, but not new allocations per response. I imagine the steps would be
> something like this:
>
> i) Receive entire response which includes possibly compressed messages on
> ResponseBuffer. ResponseBuffer is only allocated once.
> ii) Once we have received an entire response, return an iterator to the
> underlying ByteBuffer.
> iii) On each message set check if it is compressed or not.
> iv) If compressed decompress only that message set in a streaming manner
> and stores the result in another preallocated buffer
> DecompressedMessagesBuffer. When the consumer asks for the next message
> alter the flyweight that it supplies you to point to the correct slice in
> the DecompressedMessagesBuffer. Do this till all decompressed messages from
> that message set are consumed.
> v) If the message set was not compressed, then alter the flyweight to point
> to the correct slice in the ResponseBuffer. Do this till all messages from
> this message set are consumed.
>
> So basically we will switch between the two buffers when messages are
> compressed or otherwise stay only on the ResponseBuffer. With the lazy
> approach another nice thing is we can do CRC validation right before the
> message gets consumed too. Which means that the CRC algorithm will scan
> some bytes which will be hot in cache (since we are iterating a linear
> array of bytes) and as soon as we have checked the bytes they will be
> consumed by the application which means they will stay hot in L1 cache.
>
> All of this "theory crafting" is based on my currently incomplete
> understanding of the kafka protocol but it seems like compression is per
> message set so we can stream though. Also since in general we can iterate
> through the response buffer, we can do CRC validation right before the
> message is consumed.
>
> Thanks,
> Rajiv
>
> On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Rajiv,
> >
> > A side note for re-using ByteBuffer: in the new consumer we do plan to
> add
> > some memory management module such that it will try to reuse allocated
> > buffer for fetch responses. But as Jay mentioned, for now inside the
> poll()
> > call de-serialization and de-compression is done which requires to
> allocate
> > another buffer to write the de-serialized and de-compressed bytes, hence
> > even with fetch response buffer management today we still need to
> allocate
> > new buffers if compressed messages are delivered.
> >
> > Guozhang
> >
> > On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Zijing, the new consumer will be in the next release. We don't have a
> > hard
> > > date for this yet.
> > >
> > > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > > certainly could be an improvement, but it might also be that the CRC
> > > validation and compression dominate.
> > >
> > > The first step would be
> > >   https://issues.apache.org/jira/browse/KAFKA-1895
> > >
> > > This would delay the deserialization of ConsumerRecords and make it
> > > basically a wrapper around the actual MemoryRecords chunks which are
> > > basically ByteBuffer instances. We could then add an optional
> > > ConsumerRecords param in poll, to allow you to hand back your
> > > ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
> > > would also be possible.
> > >
> > > That JIRA is actually useful irrespective of these optimizations though
> > > because deferring the deserialization and decompression would allow you
> > to
> > > punt that work into a pool of processor threads. Since it is more
> common
> > to
> > > see the real bottleneck be application serialization this could be
> > > valuable.
> > >
> > > WRT the object reuse I wouldn't be shocked to learn that you actually
> get
> > > equivalent stuff out of the jvm allocator's own pooling and/or escape
> > > analysis once we are doing the allocation on demand. So it would be
> good
> > to
> > > show a real performance improvement on the newer JVMs before deciding
> to
> > go
> > > this route.
> > >
> > > -Jay
> > >
> > >
> > > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > Just a follow up - I have implemented a pretty hacky prototype It's
> too
> > > > unclean to share right now but I can clean it up if you are
> > interested. I
> > > > don't think it offers anything that people already don't know about
> > > though.
> > > >
> > > > My prototype doesn't do any metadata requests yet but I have a
> > flyweight
> > > > builder/parser of the FetchRequest and the FetchResponse protocol
> that
> > I
> > > > based on the protocol page at
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > .
> > > > On my consumer thread I allocate two buffers per broker a small
> request
> > > > buffer and a bigger response buffer. My assumption is that the
> response
> > > > buffer will be big enough that it is never too small for the
> response.
> > > If
> > > > this assumption doesn't hold then we would have to reallocate a
> bigger
> > > > buffer. These are the steps I am following right now:
> > > >
> > > > i) Whenever a client makes a request I fill up the relevant request
> > > > buffer(s) using my flyweight object instead of allocating an object
> and
> > > > then serializing it. I then write this buffer to the SocketChannel
> > > > connected to the right Kafka broker. I don't yet handle the case
> where
> > > all
> > > > of the request bytes could not be  written synchronously. If I were
> to
> > > > handle that properly I would need to register WRITE interest on the
> > > > SocketChannel on incomplete writes.
> > > >
> > > > ii) Whenever the client calls poll() I register read interest on the
> > > socket
> > > > channel using the selector and make a select call on the underlying
> > > > selector. If the selector says that the socket channel is readable, I
> > > read
> > > > all the bytes possible into my response ByteBuffer. If I can read the
> > > first
> > > > 4 bytes I know how big the response is. So I wait till all response
> > bytes
> > > > have been read before attempting to parse it. This might take
> multiple
> > > poll
> > > > calls. Once I have received the expected number of bytes I iterate
> > > through
> > > > the response ByteBuffer. There is no random access access provided to
> > the
> > > > ByteBuffer since there are way too many variable length fields. This
> a
> > > > DISADVANTAGE of doing flyweight style parsing instead of
> > deserialization
> > > > into POJOs with indexed data structures like maps.  I could build
> > > indexing
> > > > with a non-allocating primitive hashmap like the koloboke one mapping
> > > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> > required.
> > > > For me the lack of random access has not been a problem at all.
> > Iteration
> > > > is done just by providing a ByteBuffer and offset (within ByteBuffer)
> > > pair
> > > > for each message. In my own application I wrap this ByteBuffer and
> > offset
> > > > pair in my own flyweight which knows how to decode the data.
> > > >
> > > > iii) Once an entire response has been iterated through I can re-use
> > both
> > > > the request as well as response buffers.
> > > >
> > > > I am sure this can be improved upon a lot.  I allocate the following
> > > before
> > > > starting my client:
> > > >     i) A Direct ByteBuffer for requests.
> > > >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers
> are
> > > > chosen so that they can fit the biggest request/responses we expect.
> > > >    ii) A flyweight that wraps the Request ByteBuffer and can be used
> to
> > > > write a particular kind of request.  So far I have only written a
> > > flyweight
> > > > for a FetchRequest.
> > > >   iii) A flyweight that wraps the Response ByteBuffer and can be used
> > to
> > > > iterate through the entire response including finding errors. There
> is
> > no
> > > > random access allowed right now. So far I have only written a
> flyweight
> > > > parser for a FetchResponse.
> > > >   iv) A flyweight for every type of application level message that I
> > > > expect. Iterating through the response ByteBuffer using the flyweight
> > in
> > > > (iii) I get offsets into the ByteBuffer for each individual message.
> My
> > > own
> > > > messages work by using absolute position getLong(position),
> > > > getShort(position) etc calls on a ByteBuffer, so this works out great
> > for
> > > > me. We could alternatively provide an API that does allocates a new
> > > > ByteBuffer and copies the data for people who don't want the zero
> > > > allocation access.
> > > >
> > > > Sadly in my profiling I noticed that selector implementation in the
> > > > JDK allocates
> > > > but it seems like projects like Netty, Aeron have worked around this
> by
> > > > using reflection to replace the underlying implementation to make it
> > > > non-allocating. Other than that I have absolutely zero allocations
> past
> > > the
> > > > initial 4-5 allocations. I also have absolutely zero copies in user
> > space
> > > > once the data lands from the socket onto the ByteBuffer.
> > > >
> > > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <rajiv@signalfuse.com
> >
> > > > wrote:
> > > >
> > > > > I had a few more thoughts on the new API. Currently we use kafka to
> > > > > transfer really compact messages - around 25-35 bytes each. Our use
> > > case
> > > > is
> > > > > a lot of messages but each very small. Will it be possible to do
> the
> > > > > following
> > > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We
> employ
> > > our
> > > > > own binary encoding to encode our messages and it is encoded as a
> > > > flyweight
> > > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > > decoding/deserialization step. Currently we use the SimpleConsumer
> > and
> > > > have
> > > > > found the fact that it hands out ByteBuffers very useful instead
> of a
> > > > > mandatory deserialization into a POJO step. Even then through
> memory
> > > > > profiling we have found out that the ByteBuffers and the records
> take
> > > > more
> > > > > space than the actual messages themselves. Ideally we can allocate
> a
> > > big
> > > > > ByteBuffer (maybe kafka already does it) to receive our data and
> then
> > > > just
> > > > > get some kind of a flyweight iterator on the ByteBuffer something
> > like
> > > > this:
> > > > >
> > > > > // Allocate a single ByteBuffer or have kafka allocate this
> > internally.
> > > > > Either way it would be very useful to just keep re-using this
> buffer.
> > > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > > // Create a coupe of flyweights.
> > > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > > ConsumerRecord record = new ConsumerRecord();
> > > > > // Subscribe to the topics we care about.
> > > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > > while (running) {
> > > > >   // Use the buffer to get data from the server. Additionally
> re-use
> > > the
> > > > > same iterator.
> > > > >   // Resets the iterator to point to the start of the ByteBuffer.
> > > > >   consumer.poll(buffer, iterator, timeout);
> > > > >   // Now the iterator has pointers to the ByteBuffer and is capable
> > of
> > > > > advancing the cursor to read every message.
> > > > >   while (iterator.hasNext()) {
> > > > >     // The consumer record is just a flyweight over a ByteBuffer
> and
> > is
> > > > > adjusted to point to the start of the next record.
> > > > >     iterator.getNextInto(record);
> > > > >     // This is not a new ByteBuffer, its just the big buffer with
> its
> > > > > position and limit adjusted so that we only read the current
> record.
> > > > >     // Alternatively we could give it our own ByteBuffer, but the
> > point
> > > > > would be to not do a copy and instead adjust the supplied
> > > > >     // ByteBuffer's position and limit and pointer (through
> > reflection)
> > > > to
> > > > > point to the right slice of the actual big ByteBuffer.
> > > > >     // This means that we cannot stash this buffer in a hash map or
> > any
> > > > > heap allocated structure since it's contents keep changing as we
> > > iterate.
> > > > >     ByteBuffer buffer = record.getUnderlying();
> > > > >     process(buffer);  // Process cannot keep a reference to the
> > buffer
> > > -
> > > > > this is really the programmer's responsibility.
> > > > >   }
> > > > > }
> > > > >
> > > > > Given how the new consumer is meant to be used from a single thread
> > - I
> > > > > think these optional non-allocating methods will be a great boon
> for
> > > any
> > > > > one trying to save memory or prevent heap churn. It has exactly 0
> > > copies
> > > > in
> > > > > user space too which is great for performance. In our case since
> the
> > > > > messages are very tiny we end up spending more memory in all the
> > > wrapper
> > > > > objects than in the actual messages so this would be a game changer
> > for
> > > > us.
> > > > > I'd love to be able to contribute if this seems sensible. Hopefully
> > > these
> > > > > non-allocating methods can co-exist with the allocating ones and
> only
> > > > users
> > > > > who absolutely need to use them can make the trade-off  of better
> > > > > efficiency/performance for a slightly more error-prone and ugly
> API.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Thanks,
> > > > > Rajiv
> > > > >
> > > > >
> > > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> > <altergzj@yahoo.com.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hi all,The document is very beautiful and the Kafka release
> version
> > > for
> > > > >> this will be? and what is the timeline?
> > > > >> ThanksEdwin
> > > > >>
> > > > >>
> > > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > > >> rajiv@signalfuse.com> wrote:
> > > > >>
> > > > >>
> > > > >>  Awesome - can't wait for this version to be out!
> > > > >>
> > > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > The timeout in the poll call is more or less the timeout used by
> > the
> > > > >> > selector. So each call to poll will do socket activity on any
> > ready
> > > > >> > sockets, waiting for up to that time for a socket to be ready.
> > There
> > > > is
> > > > >> no
> > > > >> > longer any background threads involved in the consumer, all
> > activity
> > > > is
> > > > >> > driven by the application thread(s).
> > > > >> >
> > > > >> > The max fetch request wait time is controlled with a config and
> is
> > > > >> > independent of the time given to poll.
> > > > >> >
> > > > >> > -Jay
> > > > >> >
> > > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > > rajiv@signalfuse.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > I am trying to understand the semantics of the timeout
> specified
> > > in
> > > > >> the
> > > > >> > > poll method in
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > >> > > .
> > > > >> > > Is this timeout a measure of how long the fetch request will
> be
> > > > >> parked on
> > > > >> > > the broker waiting for a reply or is this more like the
> timeout
> > in
> > > > >> > > selector.select(long timeout) i.e. the method will return with
> > > > >> whatever
> > > > >> > > data is there after waiting a maximum of timeout. Exposing the
> > > > >> selector
> > > > >> > > timeout will be very helpful for us because we want to put a
> > tight
> > > > >> bound
> > > > >> > on
> > > > >> > > how long we are ready to wait on the poll call. When this API
> is
> > > > >> > available
> > > > >> > > we plan to use a single thread to get data from kafka, process
> > > them
> > > > as
> > > > >> > well
> > > > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > > > >> guarantee on
> > > > >> > > how much time the poll call can take at most.
> > > > >> > >
> > > > >> > > Thanks!
> > > > >> > >
> > > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > > rajiv@signalfuse.com
> > > > >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Thanks!
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Thursday, March 19, 2015, Jay Kreps <jay.kreps@gmail.com
> >
> > > > wrote:
> > > > >> > > >
> > > > >> > > >> Err, here:
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > >> > > >>
> > > > >> > > >> -Jay
> > > > >> > > >>
> > > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > > jay.kreps@gmail.com>
> > > > >> > wrote:
> > > > >> > > >>
> > > > >> > > >> > The current work in progress is documented here:
> > > > >> > > >> >
> > > > >> > > >> >
> > > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > > >> rajiv@signalfuse.com
> > > > >> > >
> > > > >> > > >> > wrote:
> > > > >> > > >> >
> > > > >> > > >> >> Is there a link to the proposed new consumer
> non-blocking
> > > API?
> > > > >> > > >> >>
> > > > >> > > >> >> Thanks,
> > > > >> > > >> >> Rajiv
> > > > >> > > >> >>
> > > > >> > > >> >
> > > > >> > > >> >
> > > > >> > > >>
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Hi Guozhang,

Thanks for the note. So if we do not deserialize till the last moment like
Jay suggested we would not need extra buffers for deserialization. Unless
we need random access to messages it seems like we can deserialize right at
the time of iteration and allocate objects only if the Consumer actually
requires deserialized objects. Some applications like mine can just utilize
the ByteBuffer contents and in those cases we can take in a ByteBuffer or a
ConsumerRecord which points to a ByteBuffer and make it point to the right
slice of our response buffer. This ByteBuffer can be re-used over and over
again.

As for decompression - that will require an extra buffer allocated at
startup, but not new allocations per response. I imagine the steps would be
something like this:

i) Receive entire response which includes possibly compressed messages on
ResponseBuffer. ResponseBuffer is only allocated once.
ii) Once we have received an entire response, return an iterator to the
underlying ByteBuffer.
iii) On each message set check if it is compressed or not.
iv) If compressed decompress only that message set in a streaming manner
and stores the result in another preallocated buffer
DecompressedMessagesBuffer. When the consumer asks for the next message
alter the flyweight that it supplies you to point to the correct slice in
the DecompressedMessagesBuffer. Do this till all decompressed messages from
that message set are consumed.
v) If the message set was not compressed, then alter the flyweight to point
to the correct slice in the ResponseBuffer. Do this till all messages from
this message set are consumed.

So basically we will switch between the two buffers when messages are
compressed or otherwise stay only on the ResponseBuffer. With the lazy
approach another nice thing is we can do CRC validation right before the
message gets consumed too. Which means that the CRC algorithm will scan
some bytes which will be hot in cache (since we are iterating a linear
array of bytes) and as soon as we have checked the bytes they will be
consumed by the application which means they will stay hot in L1 cache.

All of this "theory crafting" is based on my currently incomplete
understanding of the kafka protocol but it seems like compression is per
message set so we can stream though. Also since in general we can iterate
through the response buffer, we can do CRC validation right before the
message is consumed.

Thanks,
Rajiv

On Sun, Mar 22, 2015 at 10:29 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Rajiv,
>
> A side note for re-using ByteBuffer: in the new consumer we do plan to add
> some memory management module such that it will try to reuse allocated
> buffer for fetch responses. But as Jay mentioned, for now inside the poll()
> call de-serialization and de-compression is done which requires to allocate
> another buffer to write the de-serialized and de-compressed bytes, hence
> even with fetch response buffer management today we still need to allocate
> new buffers if compressed messages are delivered.
>
> Guozhang
>
> On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Zijing, the new consumer will be in the next release. We don't have a
> hard
> > date for this yet.
> >
> > Rajiv, I'm game if we can show a >= 20% performance improvement. It
> > certainly could be an improvement, but it might also be that the CRC
> > validation and compression dominate.
> >
> > The first step would be
> >   https://issues.apache.org/jira/browse/KAFKA-1895
> >
> > This would delay the deserialization of ConsumerRecords and make it
> > basically a wrapper around the actual MemoryRecords chunks which are
> > basically ByteBuffer instances. We could then add an optional
> > ConsumerRecords param in poll, to allow you to hand back your
> > ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
> > would also be possible.
> >
> > That JIRA is actually useful irrespective of these optimizations though
> > because deferring the deserialization and decompression would allow you
> to
> > punt that work into a pool of processor threads. Since it is more common
> to
> > see the real bottleneck be application serialization this could be
> > valuable.
> >
> > WRT the object reuse I wouldn't be shocked to learn that you actually get
> > equivalent stuff out of the jvm allocator's own pooling and/or escape
> > analysis once we are doing the allocation on demand. So it would be good
> to
> > show a real performance improvement on the newer JVMs before deciding to
> go
> > this route.
> >
> > -Jay
> >
> >
> > On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Just a follow up - I have implemented a pretty hacky prototype It's too
> > > unclean to share right now but I can clean it up if you are
> interested. I
> > > don't think it offers anything that people already don't know about
> > though.
> > >
> > > My prototype doesn't do any metadata requests yet but I have a
> flyweight
> > > builder/parser of the FetchRequest and the FetchResponse protocol that
> I
> > > based on the protocol page at
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > .
> > > On my consumer thread I allocate two buffers per broker a small request
> > > buffer and a bigger response buffer. My assumption is that the response
> > > buffer will be big enough that it is never too small for the response.
> > If
> > > this assumption doesn't hold then we would have to reallocate a bigger
> > > buffer. These are the steps I am following right now:
> > >
> > > i) Whenever a client makes a request I fill up the relevant request
> > > buffer(s) using my flyweight object instead of allocating an object and
> > > then serializing it. I then write this buffer to the SocketChannel
> > > connected to the right Kafka broker. I don't yet handle the case where
> > all
> > > of the request bytes could not be  written synchronously. If I were to
> > > handle that properly I would need to register WRITE interest on the
> > > SocketChannel on incomplete writes.
> > >
> > > ii) Whenever the client calls poll() I register read interest on the
> > socket
> > > channel using the selector and make a select call on the underlying
> > > selector. If the selector says that the socket channel is readable, I
> > read
> > > all the bytes possible into my response ByteBuffer. If I can read the
> > first
> > > 4 bytes I know how big the response is. So I wait till all response
> bytes
> > > have been read before attempting to parse it. This might take multiple
> > poll
> > > calls. Once I have received the expected number of bytes I iterate
> > through
> > > the response ByteBuffer. There is no random access access provided to
> the
> > > ByteBuffer since there are way too many variable length fields. This a
> > > DISADVANTAGE of doing flyweight style parsing instead of
> deserialization
> > > into POJOs with indexed data structures like maps.  I could build
> > indexing
> > > with a non-allocating primitive hashmap like the koloboke one mapping
> > > Topic/Partition/Messages to offsets in the ByteBuffer if really
> required.
> > > For me the lack of random access has not been a problem at all.
> Iteration
> > > is done just by providing a ByteBuffer and offset (within ByteBuffer)
> > pair
> > > for each message. In my own application I wrap this ByteBuffer and
> offset
> > > pair in my own flyweight which knows how to decode the data.
> > >
> > > iii) Once an entire response has been iterated through I can re-use
> both
> > > the request as well as response buffers.
> > >
> > > I am sure this can be improved upon a lot.  I allocate the following
> > before
> > > starting my client:
> > >     i) A Direct ByteBuffer for requests.
> > >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers are
> > > chosen so that they can fit the biggest request/responses we expect.
> > >    ii) A flyweight that wraps the Request ByteBuffer and can be used to
> > > write a particular kind of request.  So far I have only written a
> > flyweight
> > > for a FetchRequest.
> > >   iii) A flyweight that wraps the Response ByteBuffer and can be used
> to
> > > iterate through the entire response including finding errors. There is
> no
> > > random access allowed right now. So far I have only written a flyweight
> > > parser for a FetchResponse.
> > >   iv) A flyweight for every type of application level message that I
> > > expect. Iterating through the response ByteBuffer using the flyweight
> in
> > > (iii) I get offsets into the ByteBuffer for each individual message. My
> > own
> > > messages work by using absolute position getLong(position),
> > > getShort(position) etc calls on a ByteBuffer, so this works out great
> for
> > > me. We could alternatively provide an API that does allocates a new
> > > ByteBuffer and copies the data for people who don't want the zero
> > > allocation access.
> > >
> > > Sadly in my profiling I noticed that selector implementation in the
> > > JDK allocates
> > > but it seems like projects like Netty, Aeron have worked around this by
> > > using reflection to replace the underlying implementation to make it
> > > non-allocating. Other than that I have absolutely zero allocations past
> > the
> > > initial 4-5 allocations. I also have absolutely zero copies in user
> space
> > > once the data lands from the socket onto the ByteBuffer.
> > >
> > > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > I had a few more thoughts on the new API. Currently we use kafka to
> > > > transfer really compact messages - around 25-35 bytes each. Our use
> > case
> > > is
> > > > a lot of messages but each very small. Will it be possible to do the
> > > > following
> > > > to reuse a ConsumerRecord and the ConsumerRecords objects? We employ
> > our
> > > > own binary encoding to encode our messages and it is encoded as a
> > > flyweight
> > > > in SBE/Cap'n'Proto style and can be consumed without any
> > > > decoding/deserialization step. Currently we use the SimpleConsumer
> and
> > > have
> > > > found the fact that it hands out ByteBuffers very useful instead of a
> > > > mandatory deserialization into a POJO step. Even then through memory
> > > > profiling we have found out that the ByteBuffers and the records take
> > > more
> > > > space than the actual messages themselves. Ideally we can allocate a
> > big
> > > > ByteBuffer (maybe kafka already does it) to receive our data and then
> > > just
> > > > get some kind of a flyweight iterator on the ByteBuffer something
> like
> > > this:
> > > >
> > > > // Allocate a single ByteBuffer or have kafka allocate this
> internally.
> > > > Either way it would be very useful to just keep re-using this buffer.
> > > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > > // Create a coupe of flyweights.
> > > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > > ConsumerRecord record = new ConsumerRecord();
> > > > // Subscribe to the topics we care about.
> > > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > > while (running) {
> > > >   // Use the buffer to get data from the server. Additionally  re-use
> > the
> > > > same iterator.
> > > >   // Resets the iterator to point to the start of the ByteBuffer.
> > > >   consumer.poll(buffer, iterator, timeout);
> > > >   // Now the iterator has pointers to the ByteBuffer and is capable
> of
> > > > advancing the cursor to read every message.
> > > >   while (iterator.hasNext()) {
> > > >     // The consumer record is just a flyweight over a ByteBuffer and
> is
> > > > adjusted to point to the start of the next record.
> > > >     iterator.getNextInto(record);
> > > >     // This is not a new ByteBuffer, its just the big buffer with its
> > > > position and limit adjusted so that we only read the current record.
> > > >     // Alternatively we could give it our own ByteBuffer, but the
> point
> > > > would be to not do a copy and instead adjust the supplied
> > > >     // ByteBuffer's position and limit and pointer (through
> reflection)
> > > to
> > > > point to the right slice of the actual big ByteBuffer.
> > > >     // This means that we cannot stash this buffer in a hash map or
> any
> > > > heap allocated structure since it's contents keep changing as we
> > iterate.
> > > >     ByteBuffer buffer = record.getUnderlying();
> > > >     process(buffer);  // Process cannot keep a reference to the
> buffer
> > -
> > > > this is really the programmer's responsibility.
> > > >   }
> > > > }
> > > >
> > > > Given how the new consumer is meant to be used from a single thread
> - I
> > > > think these optional non-allocating methods will be a great boon for
> > any
> > > > one trying to save memory or prevent heap churn. It has exactly 0
> > copies
> > > in
> > > > user space too which is great for performance. In our case since the
> > > > messages are very tiny we end up spending more memory in all the
> > wrapper
> > > > objects than in the actual messages so this would be a game changer
> for
> > > us.
> > > > I'd love to be able to contribute if this seems sensible. Hopefully
> > these
> > > > non-allocating methods can co-exist with the allocating ones and only
> > > users
> > > > who absolutely need to use them can make the trade-off  of better
> > > > efficiency/performance for a slightly more error-prone and ugly API.
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks,
> > > > Rajiv
> > > >
> > > >
> > > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo
> <altergzj@yahoo.com.invalid
> > >
> > > > wrote:
> > > >
> > > >> Hi all,The document is very beautiful and the Kafka release version
> > for
> > > >> this will be? and what is the timeline?
> > > >> ThanksEdwin
> > > >>
> > > >>
> > > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > > >> rajiv@signalfuse.com> wrote:
> > > >>
> > > >>
> > > >>  Awesome - can't wait for this version to be out!
> > > >>
> > > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > >>
> > > >> > The timeout in the poll call is more or less the timeout used by
> the
> > > >> > selector. So each call to poll will do socket activity on any
> ready
> > > >> > sockets, waiting for up to that time for a socket to be ready.
> There
> > > is
> > > >> no
> > > >> > longer any background threads involved in the consumer, all
> activity
> > > is
> > > >> > driven by the application thread(s).
> > > >> >
> > > >> > The max fetch request wait time is controlled with a config and is
> > > >> > independent of the time given to poll.
> > > >> >
> > > >> > -Jay
> > > >> >
> > > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> > rajiv@signalfuse.com>
> > > >> > wrote:
> > > >> >
> > > >> > > I am trying to understand the semantics of the timeout specified
> > in
> > > >> the
> > > >> > > poll method in
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >> > > .
> > > >> > > Is this timeout a measure of how long the fetch request will be
> > > >> parked on
> > > >> > > the broker waiting for a reply or is this more like the timeout
> in
> > > >> > > selector.select(long timeout) i.e. the method will return with
> > > >> whatever
> > > >> > > data is there after waiting a maximum of timeout. Exposing the
> > > >> selector
> > > >> > > timeout will be very helpful for us because we want to put a
> tight
> > > >> bound
> > > >> > on
> > > >> > > how long we are ready to wait on the poll call. When this API is
> > > >> > available
> > > >> > > we plan to use a single thread to get data from kafka, process
> > them
> > > as
> > > >> > well
> > > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > > >> guarantee on
> > > >> > > how much time the poll call can take at most.
> > > >> > >
> > > >> > > Thanks!
> > > >> > >
> > > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> > rajiv@signalfuse.com
> > > >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Thanks!
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com>
> > > wrote:
> > > >> > > >
> > > >> > > >> Err, here:
> > > >> > > >>
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >> > > >>
> > > >> > > >> -Jay
> > > >> > > >>
> > > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> > jay.kreps@gmail.com>
> > > >> > wrote:
> > > >> > > >>
> > > >> > > >> > The current work in progress is documented here:
> > > >> > > >> >
> > > >> > > >> >
> > > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > > >> rajiv@signalfuse.com
> > > >> > >
> > > >> > > >> > wrote:
> > > >> > > >> >
> > > >> > > >> >> Is there a link to the proposed new consumer non-blocking
> > API?
> > > >> > > >> >>
> > > >> > > >> >> Thanks,
> > > >> > > >> >> Rajiv
> > > >> > > >> >>
> > > >> > > >> >
> > > >> > > >> >
> > > >> > > >>
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka 0.9 consumer API

Posted by Guozhang Wang <wa...@gmail.com>.
Rajiv,

A side note for re-using ByteBuffer: in the new consumer we do plan to add
some memory management module such that it will try to reuse allocated
buffer for fetch responses. But as Jay mentioned, for now inside the poll()
call de-serialization and de-compression is done which requires to allocate
another buffer to write the de-serialized and de-compressed bytes, hence
even with fetch response buffer management today we still need to allocate
new buffers if compressed messages are delivered.

Guozhang

On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <ja...@gmail.com> wrote:

> Zijing, the new consumer will be in the next release. We don't have a hard
> date for this yet.
>
> Rajiv, I'm game if we can show a >= 20% performance improvement. It
> certainly could be an improvement, but it might also be that the CRC
> validation and compression dominate.
>
> The first step would be
>   https://issues.apache.org/jira/browse/KAFKA-1895
>
> This would delay the deserialization of ConsumerRecords and make it
> basically a wrapper around the actual MemoryRecords chunks which are
> basically ByteBuffer instances. We could then add an optional
> ConsumerRecords param in poll, to allow you to hand back your
> ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
> would also be possible.
>
> That JIRA is actually useful irrespective of these optimizations though
> because deferring the deserialization and decompression would allow you to
> punt that work into a pool of processor threads. Since it is more common to
> see the real bottleneck be application serialization this could be
> valuable.
>
> WRT the object reuse I wouldn't be shocked to learn that you actually get
> equivalent stuff out of the jvm allocator's own pooling and/or escape
> analysis once we are doing the allocation on demand. So it would be good to
> show a real performance improvement on the newer JVMs before deciding to go
> this route.
>
> -Jay
>
>
> On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Just a follow up - I have implemented a pretty hacky prototype It's too
> > unclean to share right now but I can clean it up if you are interested. I
> > don't think it offers anything that people already don't know about
> though.
> >
> > My prototype doesn't do any metadata requests yet but I have a flyweight
> > builder/parser of the FetchRequest and the FetchResponse protocol that I
> > based on the protocol page at
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > .
> > On my consumer thread I allocate two buffers per broker a small request
> > buffer and a bigger response buffer. My assumption is that the response
> > buffer will be big enough that it is never too small for the response.
> If
> > this assumption doesn't hold then we would have to reallocate a bigger
> > buffer. These are the steps I am following right now:
> >
> > i) Whenever a client makes a request I fill up the relevant request
> > buffer(s) using my flyweight object instead of allocating an object and
> > then serializing it. I then write this buffer to the SocketChannel
> > connected to the right Kafka broker. I don't yet handle the case where
> all
> > of the request bytes could not be  written synchronously. If I were to
> > handle that properly I would need to register WRITE interest on the
> > SocketChannel on incomplete writes.
> >
> > ii) Whenever the client calls poll() I register read interest on the
> socket
> > channel using the selector and make a select call on the underlying
> > selector. If the selector says that the socket channel is readable, I
> read
> > all the bytes possible into my response ByteBuffer. If I can read the
> first
> > 4 bytes I know how big the response is. So I wait till all response bytes
> > have been read before attempting to parse it. This might take multiple
> poll
> > calls. Once I have received the expected number of bytes I iterate
> through
> > the response ByteBuffer. There is no random access access provided to the
> > ByteBuffer since there are way too many variable length fields. This a
> > DISADVANTAGE of doing flyweight style parsing instead of deserialization
> > into POJOs with indexed data structures like maps.  I could build
> indexing
> > with a non-allocating primitive hashmap like the koloboke one mapping
> > Topic/Partition/Messages to offsets in the ByteBuffer if really required.
> > For me the lack of random access has not been a problem at all. Iteration
> > is done just by providing a ByteBuffer and offset (within ByteBuffer)
> pair
> > for each message. In my own application I wrap this ByteBuffer and offset
> > pair in my own flyweight which knows how to decode the data.
> >
> > iii) Once an entire response has been iterated through I can re-use both
> > the request as well as response buffers.
> >
> > I am sure this can be improved upon a lot.  I allocate the following
> before
> > starting my client:
> >     i) A Direct ByteBuffer for requests.
> >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers are
> > chosen so that they can fit the biggest request/responses we expect.
> >    ii) A flyweight that wraps the Request ByteBuffer and can be used to
> > write a particular kind of request.  So far I have only written a
> flyweight
> > for a FetchRequest.
> >   iii) A flyweight that wraps the Response ByteBuffer and can be used to
> > iterate through the entire response including finding errors. There is no
> > random access allowed right now. So far I have only written a flyweight
> > parser for a FetchResponse.
> >   iv) A flyweight for every type of application level message that I
> > expect. Iterating through the response ByteBuffer using the flyweight in
> > (iii) I get offsets into the ByteBuffer for each individual message. My
> own
> > messages work by using absolute position getLong(position),
> > getShort(position) etc calls on a ByteBuffer, so this works out great for
> > me. We could alternatively provide an API that does allocates a new
> > ByteBuffer and copies the data for people who don't want the zero
> > allocation access.
> >
> > Sadly in my profiling I noticed that selector implementation in the
> > JDK allocates
> > but it seems like projects like Netty, Aeron have worked around this by
> > using reflection to replace the underlying implementation to make it
> > non-allocating. Other than that I have absolutely zero allocations past
> the
> > initial 4-5 allocations. I also have absolutely zero copies in user space
> > once the data lands from the socket onto the ByteBuffer.
> >
> > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > I had a few more thoughts on the new API. Currently we use kafka to
> > > transfer really compact messages - around 25-35 bytes each. Our use
> case
> > is
> > > a lot of messages but each very small. Will it be possible to do the
> > > following
> > > to reuse a ConsumerRecord and the ConsumerRecords objects? We employ
> our
> > > own binary encoding to encode our messages and it is encoded as a
> > flyweight
> > > in SBE/Cap'n'Proto style and can be consumed without any
> > > decoding/deserialization step. Currently we use the SimpleConsumer and
> > have
> > > found the fact that it hands out ByteBuffers very useful instead of a
> > > mandatory deserialization into a POJO step. Even then through memory
> > > profiling we have found out that the ByteBuffers and the records take
> > more
> > > space than the actual messages themselves. Ideally we can allocate a
> big
> > > ByteBuffer (maybe kafka already does it) to receive our data and then
> > just
> > > get some kind of a flyweight iterator on the ByteBuffer something like
> > this:
> > >
> > > // Allocate a single ByteBuffer or have kafka allocate this internally.
> > > Either way it would be very useful to just keep re-using this buffer.
> > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > // Create a coupe of flyweights.
> > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > ConsumerRecord record = new ConsumerRecord();
> > > // Subscribe to the topics we care about.
> > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > while (running) {
> > >   // Use the buffer to get data from the server. Additionally  re-use
> the
> > > same iterator.
> > >   // Resets the iterator to point to the start of the ByteBuffer.
> > >   consumer.poll(buffer, iterator, timeout);
> > >   // Now the iterator has pointers to the ByteBuffer and is capable of
> > > advancing the cursor to read every message.
> > >   while (iterator.hasNext()) {
> > >     // The consumer record is just a flyweight over a ByteBuffer and is
> > > adjusted to point to the start of the next record.
> > >     iterator.getNextInto(record);
> > >     // This is not a new ByteBuffer, its just the big buffer with its
> > > position and limit adjusted so that we only read the current record.
> > >     // Alternatively we could give it our own ByteBuffer, but the point
> > > would be to not do a copy and instead adjust the supplied
> > >     // ByteBuffer's position and limit and pointer (through reflection)
> > to
> > > point to the right slice of the actual big ByteBuffer.
> > >     // This means that we cannot stash this buffer in a hash map or any
> > > heap allocated structure since it's contents keep changing as we
> iterate.
> > >     ByteBuffer buffer = record.getUnderlying();
> > >     process(buffer);  // Process cannot keep a reference to the buffer
> -
> > > this is really the programmer's responsibility.
> > >   }
> > > }
> > >
> > > Given how the new consumer is meant to be used from a single thread - I
> > > think these optional non-allocating methods will be a great boon for
> any
> > > one trying to save memory or prevent heap churn. It has exactly 0
> copies
> > in
> > > user space too which is great for performance. In our case since the
> > > messages are very tiny we end up spending more memory in all the
> wrapper
> > > objects than in the actual messages so this would be a game changer for
> > us.
> > > I'd love to be able to contribute if this seems sensible. Hopefully
> these
> > > non-allocating methods can co-exist with the allocating ones and only
> > users
> > > who absolutely need to use them can make the trade-off  of better
> > > efficiency/performance for a slightly more error-prone and ugly API.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Rajiv
> > >
> > >
> > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo <altergzj@yahoo.com.invalid
> >
> > > wrote:
> > >
> > >> Hi all,The document is very beautiful and the Kafka release version
> for
> > >> this will be? and what is the timeline?
> > >> ThanksEdwin
> > >>
> > >>
> > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > >> rajiv@signalfuse.com> wrote:
> > >>
> > >>
> > >>  Awesome - can't wait for this version to be out!
> > >>
> > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > >>
> > >> > The timeout in the poll call is more or less the timeout used by the
> > >> > selector. So each call to poll will do socket activity on any ready
> > >> > sockets, waiting for up to that time for a socket to be ready. There
> > is
> > >> no
> > >> > longer any background threads involved in the consumer, all activity
> > is
> > >> > driven by the application thread(s).
> > >> >
> > >> > The max fetch request wait time is controlled with a config and is
> > >> > independent of the time given to poll.
> > >> >
> > >> > -Jay
> > >> >
> > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> rajiv@signalfuse.com>
> > >> > wrote:
> > >> >
> > >> > > I am trying to understand the semantics of the timeout specified
> in
> > >> the
> > >> > > poll method in
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >> > > .
> > >> > > Is this timeout a measure of how long the fetch request will be
> > >> parked on
> > >> > > the broker waiting for a reply or is this more like the timeout in
> > >> > > selector.select(long timeout) i.e. the method will return with
> > >> whatever
> > >> > > data is there after waiting a maximum of timeout. Exposing the
> > >> selector
> > >> > > timeout will be very helpful for us because we want to put a tight
> > >> bound
> > >> > on
> > >> > > how long we are ready to wait on the poll call. When this API is
> > >> > available
> > >> > > we plan to use a single thread to get data from kafka, process
> them
> > as
> > >> > well
> > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > >> guarantee on
> > >> > > how much time the poll call can take at most.
> > >> > >
> > >> > > Thanks!
> > >> > >
> > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> rajiv@signalfuse.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Thanks!
> > >> > > >
> > >> > > >
> > >> > > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com>
> > wrote:
> > >> > > >
> > >> > > >> Err, here:
> > >> > > >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >> > > >>
> > >> > > >> -Jay
> > >> > > >>
> > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> jay.kreps@gmail.com>
> > >> > wrote:
> > >> > > >>
> > >> > > >> > The current work in progress is documented here:
> > >> > > >> >
> > >> > > >> >
> > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > >> rajiv@signalfuse.com
> > >> > >
> > >> > > >> > wrote:
> > >> > > >> >
> > >> > > >> >> Is there a link to the proposed new consumer non-blocking
> API?
> > >> > > >> >>
> > >> > > >> >> Thanks,
> > >> > > >> >> Rajiv
> > >> > > >> >>
> > >> > > >> >
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> >
>



-- 
-- Guozhang

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Thanks for the insight Jay. That seems like a good plan. I'll take a look
at it ASAP.

 I have no idea how much things would improve in a general application with
this. Like you said CRC and decompression could still be the dominant
factor. In my experience cutting down allocation to 0 helps with 99
percentile latency and above. With GC of even ideal load I see the 99th and
especially the 99.99th be wildly higher than the median, 75th or even 95th.
Without allocation related jitter (GC/ cache miss/ TLAB misses etc) these
numbers are usually much closer. Also I am sure that zero copy and re-using
the same buffers over and over again will definitely lead to a better
performance. It should speed up other bottlenecks like CRC and
decompression to some extent too given these buffers will be sitting hot in
cache.

Right now given our workload of a large number of tiny messages, the
allocation of ByteBuffers and multiple copies adds up quickly and
ByteBuffers end up taking more space than the underlying bytes. I am sure
some other people also have a similar workload of tiny messages. So even
without massive performance improvements we will more than halve our memory
usage by not allocating multiple objects per message.

Thanks.


On Sun, Mar 22, 2015 at 7:56 AM, Jay Kreps <ja...@gmail.com> wrote:

> Zijing, the new consumer will be in the next release. We don't have a hard
> date for this yet.
>
> Rajiv, I'm game if we can show a >= 20% performance improvement. It
> certainly could be an improvement, but it might also be that the CRC
> validation and compression dominate.
>
> The first step would be
>   https://issues.apache.org/jira/browse/KAFKA-1895
>
> This would delay the deserialization of ConsumerRecords and make it
> basically a wrapper around the actual MemoryRecords chunks which are
> basically ByteBuffer instances. We could then add an optional
> ConsumerRecords param in poll, to allow you to hand back your
> ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
> would also be possible.
>
> That JIRA is actually useful irrespective of these optimizations though
> because deferring the deserialization and decompression would allow you to
> punt that work into a pool of processor threads. Since it is more common to
> see the real bottleneck be application serialization this could be
> valuable.
>
> WRT the object reuse I wouldn't be shocked to learn that you actually get
> equivalent stuff out of the jvm allocator's own pooling and/or escape
> analysis once we are doing the allocation on demand. So it would be good to
> show a real performance improvement on the newer JVMs before deciding to go
> this route.
>
> -Jay
>
>
> On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Just a follow up - I have implemented a pretty hacky prototype It's too
> > unclean to share right now but I can clean it up if you are interested. I
> > don't think it offers anything that people already don't know about
> though.
> >
> > My prototype doesn't do any metadata requests yet but I have a flyweight
> > builder/parser of the FetchRequest and the FetchResponse protocol that I
> > based on the protocol page at
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > .
> > On my consumer thread I allocate two buffers per broker a small request
> > buffer and a bigger response buffer. My assumption is that the response
> > buffer will be big enough that it is never too small for the response.
> If
> > this assumption doesn't hold then we would have to reallocate a bigger
> > buffer. These are the steps I am following right now:
> >
> > i) Whenever a client makes a request I fill up the relevant request
> > buffer(s) using my flyweight object instead of allocating an object and
> > then serializing it. I then write this buffer to the SocketChannel
> > connected to the right Kafka broker. I don't yet handle the case where
> all
> > of the request bytes could not be  written synchronously. If I were to
> > handle that properly I would need to register WRITE interest on the
> > SocketChannel on incomplete writes.
> >
> > ii) Whenever the client calls poll() I register read interest on the
> socket
> > channel using the selector and make a select call on the underlying
> > selector. If the selector says that the socket channel is readable, I
> read
> > all the bytes possible into my response ByteBuffer. If I can read the
> first
> > 4 bytes I know how big the response is. So I wait till all response bytes
> > have been read before attempting to parse it. This might take multiple
> poll
> > calls. Once I have received the expected number of bytes I iterate
> through
> > the response ByteBuffer. There is no random access access provided to the
> > ByteBuffer since there are way too many variable length fields. This a
> > DISADVANTAGE of doing flyweight style parsing instead of deserialization
> > into POJOs with indexed data structures like maps.  I could build
> indexing
> > with a non-allocating primitive hashmap like the koloboke one mapping
> > Topic/Partition/Messages to offsets in the ByteBuffer if really required.
> > For me the lack of random access has not been a problem at all. Iteration
> > is done just by providing a ByteBuffer and offset (within ByteBuffer)
> pair
> > for each message. In my own application I wrap this ByteBuffer and offset
> > pair in my own flyweight which knows how to decode the data.
> >
> > iii) Once an entire response has been iterated through I can re-use both
> > the request as well as response buffers.
> >
> > I am sure this can be improved upon a lot.  I allocate the following
> before
> > starting my client:
> >     i) A Direct ByteBuffer for requests.
> >    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers are
> > chosen so that they can fit the biggest request/responses we expect.
> >    ii) A flyweight that wraps the Request ByteBuffer and can be used to
> > write a particular kind of request.  So far I have only written a
> flyweight
> > for a FetchRequest.
> >   iii) A flyweight that wraps the Response ByteBuffer and can be used to
> > iterate through the entire response including finding errors. There is no
> > random access allowed right now. So far I have only written a flyweight
> > parser for a FetchResponse.
> >   iv) A flyweight for every type of application level message that I
> > expect. Iterating through the response ByteBuffer using the flyweight in
> > (iii) I get offsets into the ByteBuffer for each individual message. My
> own
> > messages work by using absolute position getLong(position),
> > getShort(position) etc calls on a ByteBuffer, so this works out great for
> > me. We could alternatively provide an API that does allocates a new
> > ByteBuffer and copies the data for people who don't want the zero
> > allocation access.
> >
> > Sadly in my profiling I noticed that selector implementation in the
> > JDK allocates
> > but it seems like projects like Netty, Aeron have worked around this by
> > using reflection to replace the underlying implementation to make it
> > non-allocating. Other than that I have absolutely zero allocations past
> the
> > initial 4-5 allocations. I also have absolutely zero copies in user space
> > once the data lands from the socket onto the ByteBuffer.
> >
> > On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > I had a few more thoughts on the new API. Currently we use kafka to
> > > transfer really compact messages - around 25-35 bytes each. Our use
> case
> > is
> > > a lot of messages but each very small. Will it be possible to do the
> > > following
> > > to reuse a ConsumerRecord and the ConsumerRecords objects? We employ
> our
> > > own binary encoding to encode our messages and it is encoded as a
> > flyweight
> > > in SBE/Cap'n'Proto style and can be consumed without any
> > > decoding/deserialization step. Currently we use the SimpleConsumer and
> > have
> > > found the fact that it hands out ByteBuffers very useful instead of a
> > > mandatory deserialization into a POJO step. Even then through memory
> > > profiling we have found out that the ByteBuffers and the records take
> > more
> > > space than the actual messages themselves. Ideally we can allocate a
> big
> > > ByteBuffer (maybe kafka already does it) to receive our data and then
> > just
> > > get some kind of a flyweight iterator on the ByteBuffer something like
> > this:
> > >
> > > // Allocate a single ByteBuffer or have kafka allocate this internally.
> > > Either way it would be very useful to just keep re-using this buffer.
> > > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > > // Create a coupe of flyweights.
> > > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > > ConsumerRecord record = new ConsumerRecord();
> > > // Subscribe to the topics we care about.
> > > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > > while (running) {
> > >   // Use the buffer to get data from the server. Additionally  re-use
> the
> > > same iterator.
> > >   // Resets the iterator to point to the start of the ByteBuffer.
> > >   consumer.poll(buffer, iterator, timeout);
> > >   // Now the iterator has pointers to the ByteBuffer and is capable of
> > > advancing the cursor to read every message.
> > >   while (iterator.hasNext()) {
> > >     // The consumer record is just a flyweight over a ByteBuffer and is
> > > adjusted to point to the start of the next record.
> > >     iterator.getNextInto(record);
> > >     // This is not a new ByteBuffer, its just the big buffer with its
> > > position and limit adjusted so that we only read the current record.
> > >     // Alternatively we could give it our own ByteBuffer, but the point
> > > would be to not do a copy and instead adjust the supplied
> > >     // ByteBuffer's position and limit and pointer (through reflection)
> > to
> > > point to the right slice of the actual big ByteBuffer.
> > >     // This means that we cannot stash this buffer in a hash map or any
> > > heap allocated structure since it's contents keep changing as we
> iterate.
> > >     ByteBuffer buffer = record.getUnderlying();
> > >     process(buffer);  // Process cannot keep a reference to the buffer
> -
> > > this is really the programmer's responsibility.
> > >   }
> > > }
> > >
> > > Given how the new consumer is meant to be used from a single thread - I
> > > think these optional non-allocating methods will be a great boon for
> any
> > > one trying to save memory or prevent heap churn. It has exactly 0
> copies
> > in
> > > user space too which is great for performance. In our case since the
> > > messages are very tiny we end up spending more memory in all the
> wrapper
> > > objects than in the actual messages so this would be a game changer for
> > us.
> > > I'd love to be able to contribute if this seems sensible. Hopefully
> these
> > > non-allocating methods can co-exist with the allocating ones and only
> > users
> > > who absolutely need to use them can make the trade-off  of better
> > > efficiency/performance for a slightly more error-prone and ugly API.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Rajiv
> > >
> > >
> > > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo <altergzj@yahoo.com.invalid
> >
> > > wrote:
> > >
> > >> Hi all,The document is very beautiful and the Kafka release version
> for
> > >> this will be? and what is the timeline?
> > >> ThanksEdwin
> > >>
> > >>
> > >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> > >> rajiv@signalfuse.com> wrote:
> > >>
> > >>
> > >>  Awesome - can't wait for this version to be out!
> > >>
> > >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > >>
> > >> > The timeout in the poll call is more or less the timeout used by the
> > >> > selector. So each call to poll will do socket activity on any ready
> > >> > sockets, waiting for up to that time for a socket to be ready. There
> > is
> > >> no
> > >> > longer any background threads involved in the consumer, all activity
> > is
> > >> > driven by the application thread(s).
> > >> >
> > >> > The max fetch request wait time is controlled with a config and is
> > >> > independent of the time given to poll.
> > >> >
> > >> > -Jay
> > >> >
> > >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <
> rajiv@signalfuse.com>
> > >> > wrote:
> > >> >
> > >> > > I am trying to understand the semantics of the timeout specified
> in
> > >> the
> > >> > > poll method in
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >> > > .
> > >> > > Is this timeout a measure of how long the fetch request will be
> > >> parked on
> > >> > > the broker waiting for a reply or is this more like the timeout in
> > >> > > selector.select(long timeout) i.e. the method will return with
> > >> whatever
> > >> > > data is there after waiting a maximum of timeout. Exposing the
> > >> selector
> > >> > > timeout will be very helpful for us because we want to put a tight
> > >> bound
> > >> > on
> > >> > > how long we are ready to wait on the poll call. When this API is
> > >> > available
> > >> > > we plan to use a single thread to get data from kafka, process
> them
> > as
> > >> > well
> > >> > > as run periodic jobs. For the periodic jobs to run we need a
> > >> guarantee on
> > >> > > how much time the poll call can take at most.
> > >> > >
> > >> > > Thanks!
> > >> > >
> > >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <
> rajiv@signalfuse.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Thanks!
> > >> > > >
> > >> > > >
> > >> > > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com>
> > wrote:
> > >> > > >
> > >> > > >> Err, here:
> > >> > > >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >> > > >>
> > >> > > >> -Jay
> > >> > > >>
> > >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <
> jay.kreps@gmail.com>
> > >> > wrote:
> > >> > > >>
> > >> > > >> > The current work in progress is documented here:
> > >> > > >> >
> > >> > > >> >
> > >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> > >> rajiv@signalfuse.com
> > >> > >
> > >> > > >> > wrote:
> > >> > > >> >
> > >> > > >> >> Is there a link to the proposed new consumer non-blocking
> API?
> > >> > > >> >>
> > >> > > >> >> Thanks,
> > >> > > >> >> Rajiv
> > >> > > >> >>
> > >> > > >> >
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> >
>

Re: Kafka 0.9 consumer API

Posted by Jay Kreps <ja...@gmail.com>.
Zijing, the new consumer will be in the next release. We don't have a hard
date for this yet.

Rajiv, I'm game if we can show a >= 20% performance improvement. It
certainly could be an improvement, but it might also be that the CRC
validation and compression dominate.

The first step would be
  https://issues.apache.org/jira/browse/KAFKA-1895

This would delay the deserialization of ConsumerRecords and make it
basically a wrapper around the actual MemoryRecords chunks which are
basically ByteBuffer instances. We could then add an optional
ConsumerRecords param in poll, to allow you to hand back your
ConsumerRecords instance. Optimizing the ConsumerRecord instance reuse
would also be possible.

That JIRA is actually useful irrespective of these optimizations though
because deferring the deserialization and decompression would allow you to
punt that work into a pool of processor threads. Since it is more common to
see the real bottleneck be application serialization this could be valuable.

WRT the object reuse I wouldn't be shocked to learn that you actually get
equivalent stuff out of the jvm allocator's own pooling and/or escape
analysis once we are doing the allocation on demand. So it would be good to
show a real performance improvement on the newer JVMs before deciding to go
this route.

-Jay


On Sat, Mar 21, 2015 at 9:17 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Just a follow up - I have implemented a pretty hacky prototype It's too
> unclean to share right now but I can clean it up if you are interested. I
> don't think it offers anything that people already don't know about though.
>
> My prototype doesn't do any metadata requests yet but I have a flyweight
> builder/parser of the FetchRequest and the FetchResponse protocol that I
> based on the protocol page at
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> .
> On my consumer thread I allocate two buffers per broker a small request
> buffer and a bigger response buffer. My assumption is that the response
> buffer will be big enough that it is never too small for the response.  If
> this assumption doesn't hold then we would have to reallocate a bigger
> buffer. These are the steps I am following right now:
>
> i) Whenever a client makes a request I fill up the relevant request
> buffer(s) using my flyweight object instead of allocating an object and
> then serializing it. I then write this buffer to the SocketChannel
> connected to the right Kafka broker. I don't yet handle the case where all
> of the request bytes could not be  written synchronously. If I were to
> handle that properly I would need to register WRITE interest on the
> SocketChannel on incomplete writes.
>
> ii) Whenever the client calls poll() I register read interest on the socket
> channel using the selector and make a select call on the underlying
> selector. If the selector says that the socket channel is readable, I read
> all the bytes possible into my response ByteBuffer. If I can read the first
> 4 bytes I know how big the response is. So I wait till all response bytes
> have been read before attempting to parse it. This might take multiple poll
> calls. Once I have received the expected number of bytes I iterate through
> the response ByteBuffer. There is no random access access provided to the
> ByteBuffer since there are way too many variable length fields. This a
> DISADVANTAGE of doing flyweight style parsing instead of deserialization
> into POJOs with indexed data structures like maps.  I could build indexing
> with a non-allocating primitive hashmap like the koloboke one mapping
> Topic/Partition/Messages to offsets in the ByteBuffer if really required.
> For me the lack of random access has not been a problem at all. Iteration
> is done just by providing a ByteBuffer and offset (within ByteBuffer) pair
> for each message. In my own application I wrap this ByteBuffer and offset
> pair in my own flyweight which knows how to decode the data.
>
> iii) Once an entire response has been iterated through I can re-use both
> the request as well as response buffers.
>
> I am sure this can be improved upon a lot.  I allocate the following before
> starting my client:
>     i) A Direct ByteBuffer for requests.
>    ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers are
> chosen so that they can fit the biggest request/responses we expect.
>    ii) A flyweight that wraps the Request ByteBuffer and can be used to
> write a particular kind of request.  So far I have only written a flyweight
> for a FetchRequest.
>   iii) A flyweight that wraps the Response ByteBuffer and can be used to
> iterate through the entire response including finding errors. There is no
> random access allowed right now. So far I have only written a flyweight
> parser for a FetchResponse.
>   iv) A flyweight for every type of application level message that I
> expect. Iterating through the response ByteBuffer using the flyweight in
> (iii) I get offsets into the ByteBuffer for each individual message. My own
> messages work by using absolute position getLong(position),
> getShort(position) etc calls on a ByteBuffer, so this works out great for
> me. We could alternatively provide an API that does allocates a new
> ByteBuffer and copies the data for people who don't want the zero
> allocation access.
>
> Sadly in my profiling I noticed that selector implementation in the
> JDK allocates
> but it seems like projects like Netty, Aeron have worked around this by
> using reflection to replace the underlying implementation to make it
> non-allocating. Other than that I have absolutely zero allocations past the
> initial 4-5 allocations. I also have absolutely zero copies in user space
> once the data lands from the socket onto the ByteBuffer.
>
> On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I had a few more thoughts on the new API. Currently we use kafka to
> > transfer really compact messages - around 25-35 bytes each. Our use case
> is
> > a lot of messages but each very small. Will it be possible to do the
> > following
> > to reuse a ConsumerRecord and the ConsumerRecords objects? We employ our
> > own binary encoding to encode our messages and it is encoded as a
> flyweight
> > in SBE/Cap'n'Proto style and can be consumed without any
> > decoding/deserialization step. Currently we use the SimpleConsumer and
> have
> > found the fact that it hands out ByteBuffers very useful instead of a
> > mandatory deserialization into a POJO step. Even then through memory
> > profiling we have found out that the ByteBuffers and the records take
> more
> > space than the actual messages themselves. Ideally we can allocate a big
> > ByteBuffer (maybe kafka already does it) to receive our data and then
> just
> > get some kind of a flyweight iterator on the ByteBuffer something like
> this:
> >
> > // Allocate a single ByteBuffer or have kafka allocate this internally.
> > Either way it would be very useful to just keep re-using this buffer.
> > ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> > // Create a coupe of flyweights.
> > ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> > ConsumerRecord record = new ConsumerRecord();
> > // Subscribe to the topics we care about.
> > consumer.subscribe(TopicPartitions_I_am_interested_in);
> > while (running) {
> >   // Use the buffer to get data from the server. Additionally  re-use the
> > same iterator.
> >   // Resets the iterator to point to the start of the ByteBuffer.
> >   consumer.poll(buffer, iterator, timeout);
> >   // Now the iterator has pointers to the ByteBuffer and is capable of
> > advancing the cursor to read every message.
> >   while (iterator.hasNext()) {
> >     // The consumer record is just a flyweight over a ByteBuffer and is
> > adjusted to point to the start of the next record.
> >     iterator.getNextInto(record);
> >     // This is not a new ByteBuffer, its just the big buffer with its
> > position and limit adjusted so that we only read the current record.
> >     // Alternatively we could give it our own ByteBuffer, but the point
> > would be to not do a copy and instead adjust the supplied
> >     // ByteBuffer's position and limit and pointer (through reflection)
> to
> > point to the right slice of the actual big ByteBuffer.
> >     // This means that we cannot stash this buffer in a hash map or any
> > heap allocated structure since it's contents keep changing as we iterate.
> >     ByteBuffer buffer = record.getUnderlying();
> >     process(buffer);  // Process cannot keep a reference to the buffer -
> > this is really the programmer's responsibility.
> >   }
> > }
> >
> > Given how the new consumer is meant to be used from a single thread - I
> > think these optional non-allocating methods will be a great boon for any
> > one trying to save memory or prevent heap churn. It has exactly 0 copies
> in
> > user space too which is great for performance. In our case since the
> > messages are very tiny we end up spending more memory in all the wrapper
> > objects than in the actual messages so this would be a game changer for
> us.
> > I'd love to be able to contribute if this seems sensible. Hopefully these
> > non-allocating methods can co-exist with the allocating ones and only
> users
> > who absolutely need to use them can make the trade-off  of better
> > efficiency/performance for a slightly more error-prone and ugly API.
> >
> > Thoughts?
> >
> > Thanks,
> > Rajiv
> >
> >
> > On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo <al...@yahoo.com.invalid>
> > wrote:
> >
> >> Hi all,The document is very beautiful and the Kafka release version for
> >> this will be? and what is the timeline?
> >> ThanksEdwin
> >>
> >>
> >>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
> >> rajiv@signalfuse.com> wrote:
> >>
> >>
> >>  Awesome - can't wait for this version to be out!
> >>
> >> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> >>
> >> > The timeout in the poll call is more or less the timeout used by the
> >> > selector. So each call to poll will do socket activity on any ready
> >> > sockets, waiting for up to that time for a socket to be ready. There
> is
> >> no
> >> > longer any background threads involved in the consumer, all activity
> is
> >> > driven by the application thread(s).
> >> >
> >> > The max fetch request wait time is controlled with a config and is
> >> > independent of the time given to poll.
> >> >
> >> > -Jay
> >> >
> >> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com>
> >> > wrote:
> >> >
> >> > > I am trying to understand the semantics of the timeout specified in
> >> the
> >> > > poll method in
> >> > >
> >> > >
> >> >
> >>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> >> > > .
> >> > > Is this timeout a measure of how long the fetch request will be
> >> parked on
> >> > > the broker waiting for a reply or is this more like the timeout in
> >> > > selector.select(long timeout) i.e. the method will return with
> >> whatever
> >> > > data is there after waiting a maximum of timeout. Exposing the
> >> selector
> >> > > timeout will be very helpful for us because we want to put a tight
> >> bound
> >> > on
> >> > > how long we are ready to wait on the poll call. When this API is
> >> > available
> >> > > we plan to use a single thread to get data from kafka, process them
> as
> >> > well
> >> > > as run periodic jobs. For the periodic jobs to run we need a
> >> guarantee on
> >> > > how much time the poll call can take at most.
> >> > >
> >> > > Thanks!
> >> > >
> >> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <rajiv@signalfuse.com
> >
> >> > > wrote:
> >> > >
> >> > > > Thanks!
> >> > > >
> >> > > >
> >> > > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com>
> wrote:
> >> > > >
> >> > > >> Err, here:
> >> > > >>
> >> > > >>
> >> > >
> >> >
> >>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> >> > > >>
> >> > > >> -Jay
> >> > > >>
> >> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com>
> >> > wrote:
> >> > > >>
> >> > > >> > The current work in progress is documented here:
> >> > > >> >
> >> > > >> >
> >> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> >> rajiv@signalfuse.com
> >> > >
> >> > > >> > wrote:
> >> > > >> >
> >> > > >> >> Is there a link to the proposed new consumer non-blocking API?
> >> > > >> >>
> >> > > >> >> Thanks,
> >> > > >> >> Rajiv
> >> > > >> >>
> >> > > >> >
> >> > > >> >
> >> > > >>
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >>
> >
> >
>

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Just a follow up - I have implemented a pretty hacky prototype It's too
unclean to share right now but I can clean it up if you are interested. I
don't think it offers anything that people already don't know about though.

My prototype doesn't do any metadata requests yet but I have a flyweight
builder/parser of the FetchRequest and the FetchResponse protocol that I
based on the protocol page at
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol.
On my consumer thread I allocate two buffers per broker a small request
buffer and a bigger response buffer. My assumption is that the response
buffer will be big enough that it is never too small for the response.  If
this assumption doesn't hold then we would have to reallocate a bigger
buffer. These are the steps I am following right now:

i) Whenever a client makes a request I fill up the relevant request
buffer(s) using my flyweight object instead of allocating an object and
then serializing it. I then write this buffer to the SocketChannel
connected to the right Kafka broker. I don't yet handle the case where all
of the request bytes could not be  written synchronously. If I were to
handle that properly I would need to register WRITE interest on the
SocketChannel on incomplete writes.

ii) Whenever the client calls poll() I register read interest on the socket
channel using the selector and make a select call on the underlying
selector. If the selector says that the socket channel is readable, I read
all the bytes possible into my response ByteBuffer. If I can read the first
4 bytes I know how big the response is. So I wait till all response bytes
have been read before attempting to parse it. This might take multiple poll
calls. Once I have received the expected number of bytes I iterate through
the response ByteBuffer. There is no random access access provided to the
ByteBuffer since there are way too many variable length fields. This a
DISADVANTAGE of doing flyweight style parsing instead of deserialization
into POJOs with indexed data structures like maps.  I could build indexing
with a non-allocating primitive hashmap like the koloboke one mapping
Topic/Partition/Messages to offsets in the ByteBuffer if really required.
For me the lack of random access has not been a problem at all. Iteration
is done just by providing a ByteBuffer and offset (within ByteBuffer) pair
for each message. In my own application I wrap this ByteBuffer and offset
pair in my own flyweight which knows how to decode the data.

iii) Once an entire response has been iterated through I can re-use both
the request as well as response buffers.

I am sure this can be improved upon a lot.  I allocate the following before
starting my client:
    i) A Direct ByteBuffer for requests.
   ii) A Direct ByteBuffer for responses. Sizes of the ByteBuffers are
chosen so that they can fit the biggest request/responses we expect.
   ii) A flyweight that wraps the Request ByteBuffer and can be used to
write a particular kind of request.  So far I have only written a flyweight
for a FetchRequest.
  iii) A flyweight that wraps the Response ByteBuffer and can be used to
iterate through the entire response including finding errors. There is no
random access allowed right now. So far I have only written a flyweight
parser for a FetchResponse.
  iv) A flyweight for every type of application level message that I
expect. Iterating through the response ByteBuffer using the flyweight in
(iii) I get offsets into the ByteBuffer for each individual message. My own
messages work by using absolute position getLong(position),
getShort(position) etc calls on a ByteBuffer, so this works out great for
me. We could alternatively provide an API that does allocates a new
ByteBuffer and copies the data for people who don't want the zero
allocation access.

Sadly in my profiling I noticed that selector implementation in the
JDK allocates
but it seems like projects like Netty, Aeron have worked around this by
using reflection to replace the underlying implementation to make it
non-allocating. Other than that I have absolutely zero allocations past the
initial 4-5 allocations. I also have absolutely zero copies in user space
once the data lands from the socket onto the ByteBuffer.

On Sat, Mar 21, 2015 at 11:16 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> I had a few more thoughts on the new API. Currently we use kafka to
> transfer really compact messages - around 25-35 bytes each. Our use case is
> a lot of messages but each very small. Will it be possible to do the
> following
> to reuse a ConsumerRecord and the ConsumerRecords objects? We employ our
> own binary encoding to encode our messages and it is encoded as a flyweight
> in SBE/Cap'n'Proto style and can be consumed without any
> decoding/deserialization step. Currently we use the SimpleConsumer and have
> found the fact that it hands out ByteBuffers very useful instead of a
> mandatory deserialization into a POJO step. Even then through memory
> profiling we have found out that the ByteBuffers and the records take more
> space than the actual messages themselves. Ideally we can allocate a big
> ByteBuffer (maybe kafka already does it) to receive our data and then just
> get some kind of a flyweight iterator on the ByteBuffer something like this:
>
> // Allocate a single ByteBuffer or have kafka allocate this internally.
> Either way it would be very useful to just keep re-using this buffer.
> ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
> // Create a coupe of flyweights.
> ConsumerRecordIterator iterator = new ConsumerRecordIterator();
> ConsumerRecord record = new ConsumerRecord();
> // Subscribe to the topics we care about.
> consumer.subscribe(TopicPartitions_I_am_interested_in);
> while (running) {
>   // Use the buffer to get data from the server. Additionally  re-use the
> same iterator.
>   // Resets the iterator to point to the start of the ByteBuffer.
>   consumer.poll(buffer, iterator, timeout);
>   // Now the iterator has pointers to the ByteBuffer and is capable of
> advancing the cursor to read every message.
>   while (iterator.hasNext()) {
>     // The consumer record is just a flyweight over a ByteBuffer and is
> adjusted to point to the start of the next record.
>     iterator.getNextInto(record);
>     // This is not a new ByteBuffer, its just the big buffer with its
> position and limit adjusted so that we only read the current record.
>     // Alternatively we could give it our own ByteBuffer, but the point
> would be to not do a copy and instead adjust the supplied
>     // ByteBuffer's position and limit and pointer (through reflection) to
> point to the right slice of the actual big ByteBuffer.
>     // This means that we cannot stash this buffer in a hash map or any
> heap allocated structure since it's contents keep changing as we iterate.
>     ByteBuffer buffer = record.getUnderlying();
>     process(buffer);  // Process cannot keep a reference to the buffer -
> this is really the programmer's responsibility.
>   }
> }
>
> Given how the new consumer is meant to be used from a single thread - I
> think these optional non-allocating methods will be a great boon for any
> one trying to save memory or prevent heap churn. It has exactly 0 copies in
> user space too which is great for performance. In our case since the
> messages are very tiny we end up spending more memory in all the wrapper
> objects than in the actual messages so this would be a game changer for us.
> I'd love to be able to contribute if this seems sensible. Hopefully these
> non-allocating methods can co-exist with the allocating ones and only users
> who absolutely need to use them can make the trade-off  of better
> efficiency/performance for a slightly more error-prone and ugly API.
>
> Thoughts?
>
> Thanks,
> Rajiv
>
>
> On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo <al...@yahoo.com.invalid>
> wrote:
>
>> Hi all,The document is very beautiful and the Kafka release version for
>> this will be? and what is the timeline?
>> ThanksEdwin
>>
>>
>>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <
>> rajiv@signalfuse.com> wrote:
>>
>>
>>  Awesome - can't wait for this version to be out!
>>
>> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com> wrote:
>>
>> > The timeout in the poll call is more or less the timeout used by the
>> > selector. So each call to poll will do socket activity on any ready
>> > sockets, waiting for up to that time for a socket to be ready. There is
>> no
>> > longer any background threads involved in the consumer, all activity is
>> > driven by the application thread(s).
>> >
>> > The max fetch request wait time is controlled with a config and is
>> > independent of the time given to poll.
>> >
>> > -Jay
>> >
>> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com>
>> > wrote:
>> >
>> > > I am trying to understand the semantics of the timeout specified in
>> the
>> > > poll method in
>> > >
>> > >
>> >
>> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>> > > .
>> > > Is this timeout a measure of how long the fetch request will be
>> parked on
>> > > the broker waiting for a reply or is this more like the timeout in
>> > > selector.select(long timeout) i.e. the method will return with
>> whatever
>> > > data is there after waiting a maximum of timeout. Exposing the
>> selector
>> > > timeout will be very helpful for us because we want to put a tight
>> bound
>> > on
>> > > how long we are ready to wait on the poll call. When this API is
>> > available
>> > > we plan to use a single thread to get data from kafka, process them as
>> > well
>> > > as run periodic jobs. For the periodic jobs to run we need a
>> guarantee on
>> > > how much time the poll call can take at most.
>> > >
>> > > Thanks!
>> > >
>> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com>
>> > > wrote:
>> > >
>> > > > Thanks!
>> > > >
>> > > >
>> > > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com> wrote:
>> > > >
>> > > >> Err, here:
>> > > >>
>> > > >>
>> > >
>> >
>> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>> > > >>
>> > > >> -Jay
>> > > >>
>> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com>
>> > wrote:
>> > > >>
>> > > >> > The current work in progress is documented here:
>> > > >> >
>> > > >> >
>> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
>> rajiv@signalfuse.com
>> > >
>> > > >> > wrote:
>> > > >> >
>> > > >> >> Is there a link to the proposed new consumer non-blocking API?
>> > > >> >>
>> > > >> >> Thanks,
>> > > >> >> Rajiv
>> > > >> >>
>> > > >> >
>> > > >> >
>> > > >>
>> > > >
>> > >
>> >
>>
>>
>>
>>
>
>

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
I had a few more thoughts on the new API. Currently we use kafka to
transfer really compact messages - around 25-35 bytes each. Our use case is
a lot of messages but each very small. Will it be possible to do the
following
to reuse a ConsumerRecord and the ConsumerRecords objects? We employ our
own binary encoding to encode our messages and it is encoded as a flyweight
in SBE/Cap'n'Proto style and can be consumed without any
decoding/deserialization step. Currently we use the SimpleConsumer and have
found the fact that it hands out ByteBuffers very useful instead of a
mandatory deserialization into a POJO step. Even then through memory
profiling we have found out that the ByteBuffers and the records take more
space than the actual messages themselves. Ideally we can allocate a big
ByteBuffer (maybe kafka already does it) to receive our data and then just
get some kind of a flyweight iterator on the ByteBuffer something like this:

// Allocate a single ByteBuffer or have kafka allocate this internally.
Either way it would be very useful to just keep re-using this buffer.
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
// Create a coupe of flyweights.
ConsumerRecordIterator iterator = new ConsumerRecordIterator();
ConsumerRecord record = new ConsumerRecord();
// Subscribe to the topics we care about.
consumer.subscribe(TopicPartitions_I_am_interested_in);
while (running) {
  // Use the buffer to get data from the server. Additionally  re-use the
same iterator.
  // Resets the iterator to point to the start of the ByteBuffer.
  consumer.poll(buffer, iterator, timeout);
  // Now the iterator has pointers to the ByteBuffer and is capable of
advancing the cursor to read every message.
  while (iterator.hasNext()) {
    // The consumer record is just a flyweight over a ByteBuffer and is
adjusted to point to the start of the next record.
    iterator.getNextInto(record);
    // This is not a new ByteBuffer, its just the big buffer with its
position and limit adjusted so that we only read the current record.
    // Alternatively we could give it our own ByteBuffer, but the point
would be to not do a copy and instead adjust the supplied
    // ByteBuffer's position and limit and pointer (through reflection) to
point to the right slice of the actual big ByteBuffer.
    // This means that we cannot stash this buffer in a hash map or any
heap allocated structure since it's contents keep changing as we iterate.
    ByteBuffer buffer = record.getUnderlying();
    process(buffer);  // Process cannot keep a reference to the buffer -
this is really the programmer's responsibility.
  }
}

Given how the new consumer is meant to be used from a single thread - I
think these optional non-allocating methods will be a great boon for any
one trying to save memory or prevent heap churn. It has exactly 0 copies in
user space too which is great for performance. In our case since the
messages are very tiny we end up spending more memory in all the wrapper
objects than in the actual messages so this would be a game changer for us.
I'd love to be able to contribute if this seems sensible. Hopefully these
non-allocating methods can co-exist with the allocating ones and only users
who absolutely need to use them can make the trade-off  of better
efficiency/performance for a slightly more error-prone and ugly API.

Thoughts?

Thanks,
Rajiv


On Sat, Mar 21, 2015 at 9:08 AM, Zijing Guo <al...@yahoo.com.invalid>
wrote:

> Hi all,The document is very beautiful and the Kafka release version for
> this will be? and what is the timeline?
> ThanksEdwin
>
>
>      On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
>
>  Awesome - can't wait for this version to be out!
>
> On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > The timeout in the poll call is more or less the timeout used by the
> > selector. So each call to poll will do socket activity on any ready
> > sockets, waiting for up to that time for a socket to be ready. There is
> no
> > longer any background threads involved in the consumer, all activity is
> > driven by the application thread(s).
> >
> > The max fetch request wait time is controlled with a config and is
> > independent of the time given to poll.
> >
> > -Jay
> >
> > On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > I am trying to understand the semantics of the timeout specified in the
> > > poll method in
> > >
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > .
> > > Is this timeout a measure of how long the fetch request will be parked
> on
> > > the broker waiting for a reply or is this more like the timeout in
> > > selector.select(long timeout) i.e. the method will return with whatever
> > > data is there after waiting a maximum of timeout. Exposing the selector
> > > timeout will be very helpful for us because we want to put a tight
> bound
> > on
> > > how long we are ready to wait on the poll call. When this API is
> > available
> > > we plan to use a single thread to get data from kafka, process them as
> > well
> > > as run periodic jobs. For the periodic jobs to run we need a guarantee
> on
> > > how much time the poll call can take at most.
> > >
> > > Thanks!
> > >
> > > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > >
> > > > Thanks!
> > > >
> > > >
> > > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com> wrote:
> > > >
> > > >> Err, here:
> > > >>
> > > >>
> > >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > >>
> > > >> -Jay
> > > >>
> > > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com>
> > wrote:
> > > >>
> > > >> > The current work in progress is documented here:
> > > >> >
> > > >> >
> > > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <
> rajiv@signalfuse.com
> > >
> > > >> > wrote:
> > > >> >
> > > >> >> Is there a link to the proposed new consumer non-blocking API?
> > > >> >>
> > > >> >> Thanks,
> > > >> >> Rajiv
> > > >> >>
> > > >> >
> > > >> >
> > > >>
> > > >
> > >
> >
>
>
>
>

Re: Kafka 0.9 consumer API

Posted by Zijing Guo <al...@yahoo.com.INVALID>.
Hi all,The document is very beautiful and the Kafka release version for this will be? and what is the timeline?
ThanksEdwin 


     On Friday, March 20, 2015 4:20 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:
   

 Awesome - can't wait for this version to be out!

On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com> wrote:

> The timeout in the poll call is more or less the timeout used by the
> selector. So each call to poll will do socket activity on any ready
> sockets, waiting for up to that time for a socket to be ready. There is no
> longer any background threads involved in the consumer, all activity is
> driven by the application thread(s).
>
> The max fetch request wait time is controlled with a config and is
> independent of the time given to poll.
>
> -Jay
>
> On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I am trying to understand the semantics of the timeout specified in the
> > poll method in
> >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > .
> > Is this timeout a measure of how long the fetch request will be parked on
> > the broker waiting for a reply or is this more like the timeout in
> > selector.select(long timeout) i.e. the method will return with whatever
> > data is there after waiting a maximum of timeout. Exposing the selector
> > timeout will be very helpful for us because we want to put a tight bound
> on
> > how long we are ready to wait on the poll call. When this API is
> available
> > we plan to use a single thread to get data from kafka, process them as
> well
> > as run periodic jobs. For the periodic jobs to run we need a guarantee on
> > how much time the poll call can take at most.
> >
> > Thanks!
> >
> > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Thanks!
> > >
> > >
> > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > >> Err, here:
> > >>
> > >>
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >>
> > >> -Jay
> > >>
> > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >>
> > >> > The current work in progress is documented here:
> > >> >
> > >> >
> > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <rajiv@signalfuse.com
> >
> > >> > wrote:
> > >> >
> > >> >> Is there a link to the proposed new consumer non-blocking API?
> > >> >>
> > >> >> Thanks,
> > >> >> Rajiv
> > >> >>
> > >> >
> > >> >
> > >>
> > >
> >
>


   

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Awesome - can't wait for this version to be out!

On Fri, Mar 20, 2015 at 12:22 PM, Jay Kreps <ja...@gmail.com> wrote:

> The timeout in the poll call is more or less the timeout used by the
> selector. So each call to poll will do socket activity on any ready
> sockets, waiting for up to that time for a socket to be ready. There is no
> longer any background threads involved in the consumer, all activity is
> driven by the application thread(s).
>
> The max fetch request wait time is controlled with a config and is
> independent of the time given to poll.
>
> -Jay
>
> On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I am trying to understand the semantics of the timeout specified in the
> > poll method in
> >
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > .
> > Is this timeout a measure of how long the fetch request will be parked on
> > the broker waiting for a reply or is this more like the timeout in
> > selector.select(long timeout) i.e. the method will return with whatever
> > data is there after waiting a maximum of timeout. Exposing the selector
> > timeout will be very helpful for us because we want to put a tight bound
> on
> > how long we are ready to wait on the poll call. When this API is
> available
> > we plan to use a single thread to get data from kafka, process them as
> well
> > as run periodic jobs. For the periodic jobs to run we need a guarantee on
> > how much time the poll call can take at most.
> >
> > Thanks!
> >
> > On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Thanks!
> > >
> > >
> > > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com> wrote:
> > >
> > >> Err, here:
> > >>
> > >>
> >
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >>
> > >> -Jay
> > >>
> > >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com>
> wrote:
> > >>
> > >> > The current work in progress is documented here:
> > >> >
> > >> >
> > >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <rajiv@signalfuse.com
> >
> > >> > wrote:
> > >> >
> > >> >> Is there a link to the proposed new consumer non-blocking API?
> > >> >>
> > >> >> Thanks,
> > >> >> Rajiv
> > >> >>
> > >> >
> > >> >
> > >>
> > >
> >
>

Re: Kafka 0.9 consumer API

Posted by Jay Kreps <ja...@gmail.com>.
The timeout in the poll call is more or less the timeout used by the
selector. So each call to poll will do socket activity on any ready
sockets, waiting for up to that time for a socket to be ready. There is no
longer any background threads involved in the consumer, all activity is
driven by the application thread(s).

The max fetch request wait time is controlled with a config and is
independent of the time given to poll.

-Jay

On Fri, Mar 20, 2015 at 11:30 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> I am trying to understand the semantics of the timeout specified in the
> poll method in
>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> .
> Is this timeout a measure of how long the fetch request will be parked on
> the broker waiting for a reply or is this more like the timeout in
> selector.select(long timeout) i.e. the method will return with whatever
> data is there after waiting a maximum of timeout. Exposing the selector
> timeout will be very helpful for us because we want to put a tight bound on
> how long we are ready to wait on the poll call. When this API is available
> we plan to use a single thread to get data from kafka, process them as well
> as run periodic jobs. For the periodic jobs to run we need a guarantee on
> how much time the poll call can take at most.
>
> Thanks!
>
> On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Thanks!
> >
> >
> > On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com> wrote:
> >
> >> Err, here:
> >>
> >>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> >>
> >> -Jay
> >>
> >> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com> wrote:
> >>
> >> > The current work in progress is documented here:
> >> >
> >> >
> >> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <ra...@signalfuse.com>
> >> > wrote:
> >> >
> >> >> Is there a link to the proposed new consumer non-blocking API?
> >> >>
> >> >> Thanks,
> >> >> Rajiv
> >> >>
> >> >
> >> >
> >>
> >
>

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
I am trying to understand the semantics of the timeout specified in the
poll method in
http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html.
Is this timeout a measure of how long the fetch request will be parked on
the broker waiting for a reply or is this more like the timeout in
selector.select(long timeout) i.e. the method will return with whatever
data is there after waiting a maximum of timeout. Exposing the selector
timeout will be very helpful for us because we want to put a tight bound on
how long we are ready to wait on the poll call. When this API is available
we plan to use a single thread to get data from kafka, process them as well
as run periodic jobs. For the periodic jobs to run we need a guarantee on
how much time the poll call can take at most.

Thanks!

On Fri, Mar 20, 2015 at 6:59 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Thanks!
>
>
> On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com> wrote:
>
>> Err, here:
>>
>> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>>
>> -Jay
>>
>> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com> wrote:
>>
>> > The current work in progress is documented here:
>> >
>> >
>> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <ra...@signalfuse.com>
>> > wrote:
>> >
>> >> Is there a link to the proposed new consumer non-blocking API?
>> >>
>> >> Thanks,
>> >> Rajiv
>> >>
>> >
>> >
>>
>

Re: Kafka 0.9 consumer API

Posted by Rajiv Kurian <ra...@signalfuse.com>.
Thanks!


On Thursday, March 19, 2015, Jay Kreps <ja...@gmail.com> wrote:

> Err, here:
>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
> -Jay
>
> On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <jay.kreps@gmail.com
> <javascript:;>> wrote:
>
> > The current work in progress is documented here:
> >
> >
> > On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <rajiv@signalfuse.com
> <javascript:;>>
> > wrote:
> >
> >> Is there a link to the proposed new consumer non-blocking API?
> >>
> >> Thanks,
> >> Rajiv
> >>
> >
> >
>

Re: Kafka 0.9 consumer API

Posted by Jay Kreps <ja...@gmail.com>.
Err, here:
http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

-Jay

On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps <ja...@gmail.com> wrote:

> The current work in progress is documented here:
>
>
> On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
>> Is there a link to the proposed new consumer non-blocking API?
>>
>> Thanks,
>> Rajiv
>>
>
>

Re: Kafka 0.9 consumer API

Posted by Jay Kreps <ja...@gmail.com>.
The current work in progress is documented here:


On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Is there a link to the proposed new consumer non-blocking API?
>
> Thanks,
> Rajiv
>