You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rajiv Kurian <ra...@signalfx.com> on 2016/02/05 00:21:50 UTC

Kafka protocol fetch request max wait.

I am writing a Kafka consumer client using the document at
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

One place where I am having problems is the fetch request itself. I am able
to send fetch requests and can get fetch responses that I can parse
properly, but it seems like the broker is not respecting my max wait time
and min fetch bytes parameters.

To test this part I am sending in a fetch request for 128 partitions of a
single topic  that hasn't seen any messages for a while and is currently
empty. All 128 partitions are on the same broker (running 0.9). I would
expect the broker to NOT send me any replies till my max_wait_time_ms
elapses but it is sending me a reply immediately. This reply is empty (as
expected) since the partitions have no data and I can parse the data just
fine but I don't understand why the broker is sending me a reply
immediately instead of waiting long enough.

Here is how I make a request:

private ByteBuffer createFetchRequestBuffer(int numPartitions) {
    // This does the math to get the size required.
    final int sizeRequired = numBytesRequiredForFetchRequest(numPartitions);
    final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
    // Size field
    int sizeField = sizeRequired - 4;
    buffer.putInt(sizeField);
    // API key.
    buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
    // API version.
    buffer.putShort((short) 0);
    // Correlation id.
    buffer.putInt(-3);  // Just a random correlation id.
    // Client id.
    buffer.putShort(numClientStringBytes); // The length of the client
string as a short.
    buffer.put(clientStringBytes); // The client string bytes.
    // Replica id.
    buffer.putInt(-1);  // As per the recommendation.
    // Max wait time in ms.
    buffer.putInt(30 * 1000); // Should be 30 seconds.
    // Min bytes field size.
    buffer.putInt(1000000);  // A big number.
    // Num topics.
    buffer.putInt(1); // A single topic.
    // Topic string.
    buffer.putShort(numTopicBytes); // The length of the topic string as a
short.
    buffer.put(topicBytes); // The topic string bytes.
    // Num partitions field.
    buffer.putInt(numPartitions); // 128 like I said.
    for (int i = 0; i < numPartitions; i++) {
      final int partitionId = i;
      // partition number.
      buffer.putInt(partitionId);
      // offset.
      buffer.putLong(partitionToOffset[partitionId]); // I have an array of
longs to get this from.
      // maxBytesPerPartition.
      buffer.putInt(maxBytesPerPartition);
    }

    buffer.flip();

    return buffer;
}

I get a response pretty much immediately when I write this request to the
broker. The response parses just fine but has no actual non zero size
message sets.

Thanks in advance.
Rajiv

Re: Kafka protocol fetch request max wait.

Posted by Rajiv Kurian <ra...@signalfx.com>.
Thanks Jason.

On Fri, Feb 5, 2016 at 10:13 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajiv,
>
> Thanks for all the updates. I think I've been able to reproduce this. The
> key seems to be waiting for the old log segment to be deleted. I'll
> investigate a bit more and report what I find on the JIRA.
>
> -Jason
>
> On Fri, Feb 5, 2016 at 9:50 AM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > I've updated Kafka-3159 with my findings.
> >
> > Thanks,
> > Rajiv
> >
> > On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >
> > > I think I found out when the problem happens. When a broker that is
> sent
> > a
> > > fetch request has no messages for any of the partitions it is being
> asked
> > > messages for, it returns immediately instead of waiting out the poll
> > > period. Both the kafka 0.9 consumer and my own hand written consumer
> > suffer
> > > the same problem caused by the broker returning immediately. Since my
> log
> > > retention time is only 10 minutes as soon as I stop all of my
> producers I
> > > only need to wait for 10 minutes for the logs to age out and for this
> > > problem to happen.
> > >
> > > Though it might not be a big problem for most people,it is quite
> > > conceivable that some users have low frequency topic-partitions that a
> > lot
> > > of clients listen to with big min_wait_time_ms parameters. Examples
> being
> > > some infrequent metadata update topic. My guess is that such use cases
> > > would actually cause the problem when that low frequency
> topic-partition
> > is
> > > isolated to a broker or two. It is especially harrowing because it goes
> > > against all intuition that a topic with no traffic should cause little
> to
> > > no over head.
> > >
> > > I'll update KAFKA-3159 with my findings, but it would be great to get
> > > confirmation that you can make this happen Jason.
> > >
> > > Thanks,
> > > Rajiv
> > >
> > > On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> > >
> > >> I actually restarted my application with the consumer config I
> mentioned
> > >> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get
> it
> > >> to use high CPU any more :( Not quite sure about how to proceed. I'll
> > try
> > >> to shut down all producers and let the logs age out to see if the
> > problem
> > >> happens under those conditions.
> > >>
> > >> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > >>
> > >>> Hey Jason,
> > >>>
> > >>> Yes I checked for error codes. There were none. The message was
> > >>> perfectly legal as parsed by my hand written parser. I also verified
> > the
> > >>> size of the response which was exactly the size of a response with an
> > empty
> > >>> message set per partition.
> > >>>
> > >>> The topic has 128 partitions and has a retention of 10 minutes and a
> > >>> replication factor of 3. The 128 partitions are divided amongst 3
> > brokers
> > >>> but I managed to replicate the problem of premature responses even
> > running
> > >>> my own code in a debugger connected to a locally running kafka
> > instance.
> > >>>
> > >>> I haven't made any changes to the topic configuration while running
> > >>> these tests. All the changes I have made are to the settings of my
> > fetch
> > >>> request i.e. min_bytes_per_fetch, max_wait_ms and
> > max_bytes_per_partition.
> > >>> I haven't exactly noted all the changes I made but I think I can try
> > to get
> > >>> my original configuration and see if that reproduces the problem both
> > for
> > >>> the consumer I wrote myself and the stock 0.9 consumer.
> > >>>
> > >>> I definitely saw empty responses being returned really quickly when
> > >>> running my own client locally (under a debugger) and so it's just a
> > theory
> > >>> that that might have been the problem being all those EOFExceptions.
> > >>>
> > >>> Rajiv
> > >>>
> > >>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
> > >>> wrote:
> > >>>
> > >>>> Hey Rajiv,
> > >>>>
> > >>>> Just to be clear, when you received the empty fetch response, did
> you
> > >>>> check
> > >>>> the error codes? It would help to also include some more information
> > >>>> (such
> > >>>> as broker and topic settings). If you can come up with a way to
> > >>>> reproduce
> > >>>> it, that will help immensely.
> > >>>>
> > >>>> Also, would you mind updating KAFKA-3159 with your findings about
> the
> > >>>> high
> > >>>> CPU issue? If the problem went away after a configuration change,
> does
> > >>>> it
> > >>>> come back when those changes are reverted?
> > >>>>
> > >>>> Thanks,
> > >>>> Jason
> > >>>>
> > >>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com>
> > >>>> wrote:
> > >>>>
> > >>>> > Indeed this seems to be the case. I am now running the client
> > >>>> mentioned in
> > >>>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no
> > longer
> > >>>> > taking up high CPU. The high number of EOF exceptions are also
> gone.
> > >>>> It is
> > >>>> > performing very well now. I can't understand if the improvement is
> > >>>> because
> > >>>> > of my config  changes (min_bytes, max_bytes_per_partition, max
> wait
> > >>>> time)
> > >>>> > etc or because of a bug in the 0.9 broker. I have definitely
> under a
> > >>>> > debugger seen a problem where I was getting back empty messages
> from
> > >>>> the
> > >>>> > broker running locally. It might be worth creating a bug for this.
> > >>>> >
> > >>>> > Thanks,
> > >>>> > Rajiv
> > >>>> >
> > >>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
> > >>>> wrote:
> > >>>> >
> > >>>> > > And just like that it stopped happening even though I didn't
> > change
> > >>>> any
> > >>>> > of
> > >>>> > > my code. I had filed
> > >>>> https://issues.apache.org/jira/browse/KAFKA-3159
> > >>>> > > where the stock 0.9 kafka consumer was using very high CPU and
> > >>>> seeing a
> > >>>> > lot
> > >>>> > > of EOFExceptions on the same topic and partition. I wonder if it
> > was
> > >>>> > > hitting the same problem (lots of empty messages) even though we
> > >>>> asked
> > >>>> > the
> > >>>> > > broker to park the request till enough bytes came through.
> > >>>> > >
> > >>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <
> rajiv@signalfx.com>
> > >>>> wrote:
> > >>>> > >
> > >>>> > >> I am writing a Kafka consumer client using the document at
> > >>>> > >>
> > >>>> >
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >>>> > >>
> > >>>> > >> One place where I am having problems is the fetch request
> itself.
> > >>>> I am
> > >>>> > >> able to send fetch requests and can get fetch responses that I
> > can
> > >>>> parse
> > >>>> > >> properly, but it seems like the broker is not respecting my max
> > >>>> wait
> > >>>> > time
> > >>>> > >> and min fetch bytes parameters.
> > >>>> > >>
> > >>>> > >> To test this part I am sending in a fetch request for 128
> > >>>> partitions of
> > >>>> > a
> > >>>> > >> single topic  that hasn't seen any messages for a while and is
> > >>>> currently
> > >>>> > >> empty. All 128 partitions are on the same broker (running
> 0.9). I
> > >>>> would
> > >>>> > >> expect the broker to NOT send me any replies till my
> > >>>> max_wait_time_ms
> > >>>> > >> elapses but it is sending me a reply immediately. This reply is
> > >>>> empty
> > >>>> > (as
> > >>>> > >> expected) since the partitions have no data and I can parse the
> > >>>> data
> > >>>> > just
> > >>>> > >> fine but I don't understand why the broker is sending me a
> reply
> > >>>> > >> immediately instead of waiting long enough.
> > >>>> > >>
> > >>>> > >> Here is how I make a request:
> > >>>> > >>
> > >>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions)
> {
> > >>>> > >>     // This does the math to get the size required.
> > >>>> > >>     final int sizeRequired =
> > >>>> > >> numBytesRequiredForFetchRequest(numPartitions);
> > >>>> > >>     final ByteBuffer buffer =
> > >>>> ByteBuffer.allocateDirect(sizeRequired);
> > >>>> > >>     // Size field
> > >>>> > >>     int sizeField = sizeRequired - 4;
> > >>>> > >>     buffer.putInt(sizeField);
> > >>>> > >>     // API key.
> > >>>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> > >>>> > >>     // API version.
> > >>>> > >>     buffer.putShort((short) 0);
> > >>>> > >>     // Correlation id.
> > >>>> > >>     buffer.putInt(-3);  // Just a random correlation id.
> > >>>> > >>     // Client id.
> > >>>> > >>     buffer.putShort(numClientStringBytes); // The length of the
> > >>>> client
> > >>>> > >> string as a short.
> > >>>> > >>     buffer.put(clientStringBytes); // The client string bytes.
> > >>>> > >>     // Replica id.
> > >>>> > >>     buffer.putInt(-1);  // As per the recommendation.
> > >>>> > >>     // Max wait time in ms.
> > >>>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> > >>>> > >>     // Min bytes field size.
> > >>>> > >>     buffer.putInt(1000000);  // A big number.
> > >>>> > >>     // Num topics.
> > >>>> > >>     buffer.putInt(1); // A single topic.
> > >>>> > >>     // Topic string.
> > >>>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
> > >>>> string as
> > >>>> > >> a short.
> > >>>> > >>     buffer.put(topicBytes); // The topic string bytes.
> > >>>> > >>     // Num partitions field.
> > >>>> > >>     buffer.putInt(numPartitions); // 128 like I said.
> > >>>> > >>     for (int i = 0; i < numPartitions; i++) {
> > >>>> > >>       final int partitionId = i;
> > >>>> > >>       // partition number.
> > >>>> > >>       buffer.putInt(partitionId);
> > >>>> > >>       // offset.
> > >>>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have
> > an
> > >>>> array
> > >>>> > >> of longs to get this from.
> > >>>> > >>       // maxBytesPerPartition.
> > >>>> > >>       buffer.putInt(maxBytesPerPartition);
> > >>>> > >>     }
> > >>>> > >>
> > >>>> > >>     buffer.flip();
> > >>>> > >>
> > >>>> > >>     return buffer;
> > >>>> > >> }
> > >>>> > >>
> > >>>> > >> I get a response pretty much immediately when I write this
> > request
> > >>>> to
> > >>>> > the
> > >>>> > >> broker. The response parses just fine but has no actual non
> zero
> > >>>> size
> > >>>> > >> message sets.
> > >>>> > >>
> > >>>> > >> Thanks in advance.
> > >>>> > >> Rajiv
> > >>>> > >>
> > >>>> > >>
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
>

Re: Kafka protocol fetch request max wait.

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Rajiv,

Thanks for all the updates. I think I've been able to reproduce this. The
key seems to be waiting for the old log segment to be deleted. I'll
investigate a bit more and report what I find on the JIRA.

-Jason

On Fri, Feb 5, 2016 at 9:50 AM, Rajiv Kurian <ra...@signalfx.com> wrote:

> I've updated Kafka-3159 with my findings.
>
> Thanks,
> Rajiv
>
> On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > I think I found out when the problem happens. When a broker that is sent
> a
> > fetch request has no messages for any of the partitions it is being asked
> > messages for, it returns immediately instead of waiting out the poll
> > period. Both the kafka 0.9 consumer and my own hand written consumer
> suffer
> > the same problem caused by the broker returning immediately. Since my log
> > retention time is only 10 minutes as soon as I stop all of my producers I
> > only need to wait for 10 minutes for the logs to age out and for this
> > problem to happen.
> >
> > Though it might not be a big problem for most people,it is quite
> > conceivable that some users have low frequency topic-partitions that a
> lot
> > of clients listen to with big min_wait_time_ms parameters. Examples being
> > some infrequent metadata update topic. My guess is that such use cases
> > would actually cause the problem when that low frequency topic-partition
> is
> > isolated to a broker or two. It is especially harrowing because it goes
> > against all intuition that a topic with no traffic should cause little to
> > no over head.
> >
> > I'll update KAFKA-3159 with my findings, but it would be great to get
> > confirmation that you can make this happen Jason.
> >
> > Thanks,
> > Rajiv
> >
> > On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
> >
> >> I actually restarted my application with the consumer config I mentioned
> >> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it
> >> to use high CPU any more :( Not quite sure about how to proceed. I'll
> try
> >> to shut down all producers and let the logs age out to see if the
> problem
> >> happens under those conditions.
> >>
> >> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >>
> >>> Hey Jason,
> >>>
> >>> Yes I checked for error codes. There were none. The message was
> >>> perfectly legal as parsed by my hand written parser. I also verified
> the
> >>> size of the response which was exactly the size of a response with an
> empty
> >>> message set per partition.
> >>>
> >>> The topic has 128 partitions and has a retention of 10 minutes and a
> >>> replication factor of 3. The 128 partitions are divided amongst 3
> brokers
> >>> but I managed to replicate the problem of premature responses even
> running
> >>> my own code in a debugger connected to a locally running kafka
> instance.
> >>>
> >>> I haven't made any changes to the topic configuration while running
> >>> these tests. All the changes I have made are to the settings of my
> fetch
> >>> request i.e. min_bytes_per_fetch, max_wait_ms and
> max_bytes_per_partition.
> >>> I haven't exactly noted all the changes I made but I think I can try
> to get
> >>> my original configuration and see if that reproduces the problem both
> for
> >>> the consumer I wrote myself and the stock 0.9 consumer.
> >>>
> >>> I definitely saw empty responses being returned really quickly when
> >>> running my own client locally (under a debugger) and so it's just a
> theory
> >>> that that might have been the problem being all those EOFExceptions.
> >>>
> >>> Rajiv
> >>>
> >>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
> >>> wrote:
> >>>
> >>>> Hey Rajiv,
> >>>>
> >>>> Just to be clear, when you received the empty fetch response, did you
> >>>> check
> >>>> the error codes? It would help to also include some more information
> >>>> (such
> >>>> as broker and topic settings). If you can come up with a way to
> >>>> reproduce
> >>>> it, that will help immensely.
> >>>>
> >>>> Also, would you mind updating KAFKA-3159 with your findings about the
> >>>> high
> >>>> CPU issue? If the problem went away after a configuration change, does
> >>>> it
> >>>> come back when those changes are reverted?
> >>>>
> >>>> Thanks,
> >>>> Jason
> >>>>
> >>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com>
> >>>> wrote:
> >>>>
> >>>> > Indeed this seems to be the case. I am now running the client
> >>>> mentioned in
> >>>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no
> longer
> >>>> > taking up high CPU. The high number of EOF exceptions are also gone.
> >>>> It is
> >>>> > performing very well now. I can't understand if the improvement is
> >>>> because
> >>>> > of my config  changes (min_bytes, max_bytes_per_partition, max wait
> >>>> time)
> >>>> > etc or because of a bug in the 0.9 broker. I have definitely under a
> >>>> > debugger seen a problem where I was getting back empty messages from
> >>>> the
> >>>> > broker running locally. It might be worth creating a bug for this.
> >>>> >
> >>>> > Thanks,
> >>>> > Rajiv
> >>>> >
> >>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
> >>>> wrote:
> >>>> >
> >>>> > > And just like that it stopped happening even though I didn't
> change
> >>>> any
> >>>> > of
> >>>> > > my code. I had filed
> >>>> https://issues.apache.org/jira/browse/KAFKA-3159
> >>>> > > where the stock 0.9 kafka consumer was using very high CPU and
> >>>> seeing a
> >>>> > lot
> >>>> > > of EOFExceptions on the same topic and partition. I wonder if it
> was
> >>>> > > hitting the same problem (lots of empty messages) even though we
> >>>> asked
> >>>> > the
> >>>> > > broker to park the request till enough bytes came through.
> >>>> > >
> >>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
> >>>> wrote:
> >>>> > >
> >>>> > >> I am writing a Kafka consumer client using the document at
> >>>> > >>
> >>>> >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >>>> > >>
> >>>> > >> One place where I am having problems is the fetch request itself.
> >>>> I am
> >>>> > >> able to send fetch requests and can get fetch responses that I
> can
> >>>> parse
> >>>> > >> properly, but it seems like the broker is not respecting my max
> >>>> wait
> >>>> > time
> >>>> > >> and min fetch bytes parameters.
> >>>> > >>
> >>>> > >> To test this part I am sending in a fetch request for 128
> >>>> partitions of
> >>>> > a
> >>>> > >> single topic  that hasn't seen any messages for a while and is
> >>>> currently
> >>>> > >> empty. All 128 partitions are on the same broker (running 0.9). I
> >>>> would
> >>>> > >> expect the broker to NOT send me any replies till my
> >>>> max_wait_time_ms
> >>>> > >> elapses but it is sending me a reply immediately. This reply is
> >>>> empty
> >>>> > (as
> >>>> > >> expected) since the partitions have no data and I can parse the
> >>>> data
> >>>> > just
> >>>> > >> fine but I don't understand why the broker is sending me a reply
> >>>> > >> immediately instead of waiting long enough.
> >>>> > >>
> >>>> > >> Here is how I make a request:
> >>>> > >>
> >>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
> >>>> > >>     // This does the math to get the size required.
> >>>> > >>     final int sizeRequired =
> >>>> > >> numBytesRequiredForFetchRequest(numPartitions);
> >>>> > >>     final ByteBuffer buffer =
> >>>> ByteBuffer.allocateDirect(sizeRequired);
> >>>> > >>     // Size field
> >>>> > >>     int sizeField = sizeRequired - 4;
> >>>> > >>     buffer.putInt(sizeField);
> >>>> > >>     // API key.
> >>>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> >>>> > >>     // API version.
> >>>> > >>     buffer.putShort((short) 0);
> >>>> > >>     // Correlation id.
> >>>> > >>     buffer.putInt(-3);  // Just a random correlation id.
> >>>> > >>     // Client id.
> >>>> > >>     buffer.putShort(numClientStringBytes); // The length of the
> >>>> client
> >>>> > >> string as a short.
> >>>> > >>     buffer.put(clientStringBytes); // The client string bytes.
> >>>> > >>     // Replica id.
> >>>> > >>     buffer.putInt(-1);  // As per the recommendation.
> >>>> > >>     // Max wait time in ms.
> >>>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> >>>> > >>     // Min bytes field size.
> >>>> > >>     buffer.putInt(1000000);  // A big number.
> >>>> > >>     // Num topics.
> >>>> > >>     buffer.putInt(1); // A single topic.
> >>>> > >>     // Topic string.
> >>>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
> >>>> string as
> >>>> > >> a short.
> >>>> > >>     buffer.put(topicBytes); // The topic string bytes.
> >>>> > >>     // Num partitions field.
> >>>> > >>     buffer.putInt(numPartitions); // 128 like I said.
> >>>> > >>     for (int i = 0; i < numPartitions; i++) {
> >>>> > >>       final int partitionId = i;
> >>>> > >>       // partition number.
> >>>> > >>       buffer.putInt(partitionId);
> >>>> > >>       // offset.
> >>>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have
> an
> >>>> array
> >>>> > >> of longs to get this from.
> >>>> > >>       // maxBytesPerPartition.
> >>>> > >>       buffer.putInt(maxBytesPerPartition);
> >>>> > >>     }
> >>>> > >>
> >>>> > >>     buffer.flip();
> >>>> > >>
> >>>> > >>     return buffer;
> >>>> > >> }
> >>>> > >>
> >>>> > >> I get a response pretty much immediately when I write this
> request
> >>>> to
> >>>> > the
> >>>> > >> broker. The response parses just fine but has no actual non zero
> >>>> size
> >>>> > >> message sets.
> >>>> > >>
> >>>> > >> Thanks in advance.
> >>>> > >> Rajiv
> >>>> > >>
> >>>> > >>
> >>>> > >
> >>>> >
> >>>>
> >>>
> >>>
> >>
> >
>

Re: Kafka protocol fetch request max wait.

Posted by Ismael Juma <is...@juma.me.uk>.
Thanks for getting to the bottom of this Rajiv.

Ismael

On Fri, Feb 5, 2016 at 5:50 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> I've updated Kafka-3159 with my findings.
>
> Thanks,
> Rajiv
>
> On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > I think I found out when the problem happens. When a broker that is sent
> a
> > fetch request has no messages for any of the partitions it is being asked
> > messages for, it returns immediately instead of waiting out the poll
> > period. Both the kafka 0.9 consumer and my own hand written consumer
> suffer
> > the same problem caused by the broker returning immediately. Since my log
> > retention time is only 10 minutes as soon as I stop all of my producers I
> > only need to wait for 10 minutes for the logs to age out and for this
> > problem to happen.
> >
> > Though it might not be a big problem for most people,it is quite
> > conceivable that some users have low frequency topic-partitions that a
> lot
> > of clients listen to with big min_wait_time_ms parameters. Examples being
> > some infrequent metadata update topic. My guess is that such use cases
> > would actually cause the problem when that low frequency topic-partition
> is
> > isolated to a broker or two. It is especially harrowing because it goes
> > against all intuition that a topic with no traffic should cause little to
> > no over head.
> >
> > I'll update KAFKA-3159 with my findings, but it would be great to get
> > confirmation that you can make this happen Jason.
> >
> > Thanks,
> > Rajiv
> >
> > On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
> >
> >> I actually restarted my application with the consumer config I mentioned
> >> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it
> >> to use high CPU any more :( Not quite sure about how to proceed. I'll
> try
> >> to shut down all producers and let the logs age out to see if the
> problem
> >> happens under those conditions.
> >>
> >> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >>
> >>> Hey Jason,
> >>>
> >>> Yes I checked for error codes. There were none. The message was
> >>> perfectly legal as parsed by my hand written parser. I also verified
> the
> >>> size of the response which was exactly the size of a response with an
> empty
> >>> message set per partition.
> >>>
> >>> The topic has 128 partitions and has a retention of 10 minutes and a
> >>> replication factor of 3. The 128 partitions are divided amongst 3
> brokers
> >>> but I managed to replicate the problem of premature responses even
> running
> >>> my own code in a debugger connected to a locally running kafka
> instance.
> >>>
> >>> I haven't made any changes to the topic configuration while running
> >>> these tests. All the changes I have made are to the settings of my
> fetch
> >>> request i.e. min_bytes_per_fetch, max_wait_ms and
> max_bytes_per_partition.
> >>> I haven't exactly noted all the changes I made but I think I can try
> to get
> >>> my original configuration and see if that reproduces the problem both
> for
> >>> the consumer I wrote myself and the stock 0.9 consumer.
> >>>
> >>> I definitely saw empty responses being returned really quickly when
> >>> running my own client locally (under a debugger) and so it's just a
> theory
> >>> that that might have been the problem being all those EOFExceptions.
> >>>
> >>> Rajiv
> >>>
> >>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
> >>> wrote:
> >>>
> >>>> Hey Rajiv,
> >>>>
> >>>> Just to be clear, when you received the empty fetch response, did you
> >>>> check
> >>>> the error codes? It would help to also include some more information
> >>>> (such
> >>>> as broker and topic settings). If you can come up with a way to
> >>>> reproduce
> >>>> it, that will help immensely.
> >>>>
> >>>> Also, would you mind updating KAFKA-3159 with your findings about the
> >>>> high
> >>>> CPU issue? If the problem went away after a configuration change, does
> >>>> it
> >>>> come back when those changes are reverted?
> >>>>
> >>>> Thanks,
> >>>> Jason
> >>>>
> >>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com>
> >>>> wrote:
> >>>>
> >>>> > Indeed this seems to be the case. I am now running the client
> >>>> mentioned in
> >>>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no
> longer
> >>>> > taking up high CPU. The high number of EOF exceptions are also gone.
> >>>> It is
> >>>> > performing very well now. I can't understand if the improvement is
> >>>> because
> >>>> > of my config  changes (min_bytes, max_bytes_per_partition, max wait
> >>>> time)
> >>>> > etc or because of a bug in the 0.9 broker. I have definitely under a
> >>>> > debugger seen a problem where I was getting back empty messages from
> >>>> the
> >>>> > broker running locally. It might be worth creating a bug for this.
> >>>> >
> >>>> > Thanks,
> >>>> > Rajiv
> >>>> >
> >>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
> >>>> wrote:
> >>>> >
> >>>> > > And just like that it stopped happening even though I didn't
> change
> >>>> any
> >>>> > of
> >>>> > > my code. I had filed
> >>>> https://issues.apache.org/jira/browse/KAFKA-3159
> >>>> > > where the stock 0.9 kafka consumer was using very high CPU and
> >>>> seeing a
> >>>> > lot
> >>>> > > of EOFExceptions on the same topic and partition. I wonder if it
> was
> >>>> > > hitting the same problem (lots of empty messages) even though we
> >>>> asked
> >>>> > the
> >>>> > > broker to park the request till enough bytes came through.
> >>>> > >
> >>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
> >>>> wrote:
> >>>> > >
> >>>> > >> I am writing a Kafka consumer client using the document at
> >>>> > >>
> >>>> >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >>>> > >>
> >>>> > >> One place where I am having problems is the fetch request itself.
> >>>> I am
> >>>> > >> able to send fetch requests and can get fetch responses that I
> can
> >>>> parse
> >>>> > >> properly, but it seems like the broker is not respecting my max
> >>>> wait
> >>>> > time
> >>>> > >> and min fetch bytes parameters.
> >>>> > >>
> >>>> > >> To test this part I am sending in a fetch request for 128
> >>>> partitions of
> >>>> > a
> >>>> > >> single topic  that hasn't seen any messages for a while and is
> >>>> currently
> >>>> > >> empty. All 128 partitions are on the same broker (running 0.9). I
> >>>> would
> >>>> > >> expect the broker to NOT send me any replies till my
> >>>> max_wait_time_ms
> >>>> > >> elapses but it is sending me a reply immediately. This reply is
> >>>> empty
> >>>> > (as
> >>>> > >> expected) since the partitions have no data and I can parse the
> >>>> data
> >>>> > just
> >>>> > >> fine but I don't understand why the broker is sending me a reply
> >>>> > >> immediately instead of waiting long enough.
> >>>> > >>
> >>>> > >> Here is how I make a request:
> >>>> > >>
> >>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
> >>>> > >>     // This does the math to get the size required.
> >>>> > >>     final int sizeRequired =
> >>>> > >> numBytesRequiredForFetchRequest(numPartitions);
> >>>> > >>     final ByteBuffer buffer =
> >>>> ByteBuffer.allocateDirect(sizeRequired);
> >>>> > >>     // Size field
> >>>> > >>     int sizeField = sizeRequired - 4;
> >>>> > >>     buffer.putInt(sizeField);
> >>>> > >>     // API key.
> >>>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> >>>> > >>     // API version.
> >>>> > >>     buffer.putShort((short) 0);
> >>>> > >>     // Correlation id.
> >>>> > >>     buffer.putInt(-3);  // Just a random correlation id.
> >>>> > >>     // Client id.
> >>>> > >>     buffer.putShort(numClientStringBytes); // The length of the
> >>>> client
> >>>> > >> string as a short.
> >>>> > >>     buffer.put(clientStringBytes); // The client string bytes.
> >>>> > >>     // Replica id.
> >>>> > >>     buffer.putInt(-1);  // As per the recommendation.
> >>>> > >>     // Max wait time in ms.
> >>>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> >>>> > >>     // Min bytes field size.
> >>>> > >>     buffer.putInt(1000000);  // A big number.
> >>>> > >>     // Num topics.
> >>>> > >>     buffer.putInt(1); // A single topic.
> >>>> > >>     // Topic string.
> >>>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
> >>>> string as
> >>>> > >> a short.
> >>>> > >>     buffer.put(topicBytes); // The topic string bytes.
> >>>> > >>     // Num partitions field.
> >>>> > >>     buffer.putInt(numPartitions); // 128 like I said.
> >>>> > >>     for (int i = 0; i < numPartitions; i++) {
> >>>> > >>       final int partitionId = i;
> >>>> > >>       // partition number.
> >>>> > >>       buffer.putInt(partitionId);
> >>>> > >>       // offset.
> >>>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have
> an
> >>>> array
> >>>> > >> of longs to get this from.
> >>>> > >>       // maxBytesPerPartition.
> >>>> > >>       buffer.putInt(maxBytesPerPartition);
> >>>> > >>     }
> >>>> > >>
> >>>> > >>     buffer.flip();
> >>>> > >>
> >>>> > >>     return buffer;
> >>>> > >> }
> >>>> > >>
> >>>> > >> I get a response pretty much immediately when I write this
> request
> >>>> to
> >>>> > the
> >>>> > >> broker. The response parses just fine but has no actual non zero
> >>>> size
> >>>> > >> message sets.
> >>>> > >>
> >>>> > >> Thanks in advance.
> >>>> > >> Rajiv
> >>>> > >>
> >>>> > >>
> >>>> > >
> >>>> >
> >>>>
> >>>
> >>>
> >>
> >
>

Re: Kafka protocol fetch request max wait.

Posted by Rajiv Kurian <ra...@signalfx.com>.
I've updated Kafka-3159 with my findings.

Thanks,
Rajiv

On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> I think I found out when the problem happens. When a broker that is sent a
> fetch request has no messages for any of the partitions it is being asked
> messages for, it returns immediately instead of waiting out the poll
> period. Both the kafka 0.9 consumer and my own hand written consumer suffer
> the same problem caused by the broker returning immediately. Since my log
> retention time is only 10 minutes as soon as I stop all of my producers I
> only need to wait for 10 minutes for the logs to age out and for this
> problem to happen.
>
> Though it might not be a big problem for most people,it is quite
> conceivable that some users have low frequency topic-partitions that a lot
> of clients listen to with big min_wait_time_ms parameters. Examples being
> some infrequent metadata update topic. My guess is that such use cases
> would actually cause the problem when that low frequency topic-partition is
> isolated to a broker or two. It is especially harrowing because it goes
> against all intuition that a topic with no traffic should cause little to
> no over head.
>
> I'll update KAFKA-3159 with my findings, but it would be great to get
> confirmation that you can make this happen Jason.
>
> Thanks,
> Rajiv
>
> On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
>> I actually restarted my application with the consumer config I mentioned
>> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it
>> to use high CPU any more :( Not quite sure about how to proceed. I'll try
>> to shut down all producers and let the logs age out to see if the problem
>> happens under those conditions.
>>
>> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>>
>>> Hey Jason,
>>>
>>> Yes I checked for error codes. There were none. The message was
>>> perfectly legal as parsed by my hand written parser. I also verified the
>>> size of the response which was exactly the size of a response with an empty
>>> message set per partition.
>>>
>>> The topic has 128 partitions and has a retention of 10 minutes and a
>>> replication factor of 3. The 128 partitions are divided amongst 3 brokers
>>> but I managed to replicate the problem of premature responses even running
>>> my own code in a debugger connected to a locally running kafka instance.
>>>
>>> I haven't made any changes to the topic configuration while running
>>> these tests. All the changes I have made are to the settings of my fetch
>>> request i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition.
>>> I haven't exactly noted all the changes I made but I think I can try to get
>>> my original configuration and see if that reproduces the problem both for
>>> the consumer I wrote myself and the stock 0.9 consumer.
>>>
>>> I definitely saw empty responses being returned really quickly when
>>> running my own client locally (under a debugger) and so it's just a theory
>>> that that might have been the problem being all those EOFExceptions.
>>>
>>> Rajiv
>>>
>>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
>>> wrote:
>>>
>>>> Hey Rajiv,
>>>>
>>>> Just to be clear, when you received the empty fetch response, did you
>>>> check
>>>> the error codes? It would help to also include some more information
>>>> (such
>>>> as broker and topic settings). If you can come up with a way to
>>>> reproduce
>>>> it, that will help immensely.
>>>>
>>>> Also, would you mind updating KAFKA-3159 with your findings about the
>>>> high
>>>> CPU issue? If the problem went away after a configuration change, does
>>>> it
>>>> come back when those changes are reverted?
>>>>
>>>> Thanks,
>>>> Jason
>>>>
>>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com>
>>>> wrote:
>>>>
>>>> > Indeed this seems to be the case. I am now running the client
>>>> mentioned in
>>>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
>>>> > taking up high CPU. The high number of EOF exceptions are also gone.
>>>> It is
>>>> > performing very well now. I can't understand if the improvement is
>>>> because
>>>> > of my config  changes (min_bytes, max_bytes_per_partition, max wait
>>>> time)
>>>> > etc or because of a bug in the 0.9 broker. I have definitely under a
>>>> > debugger seen a problem where I was getting back empty messages from
>>>> the
>>>> > broker running locally. It might be worth creating a bug for this.
>>>> >
>>>> > Thanks,
>>>> > Rajiv
>>>> >
>>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
>>>> wrote:
>>>> >
>>>> > > And just like that it stopped happening even though I didn't change
>>>> any
>>>> > of
>>>> > > my code. I had filed
>>>> https://issues.apache.org/jira/browse/KAFKA-3159
>>>> > > where the stock 0.9 kafka consumer was using very high CPU and
>>>> seeing a
>>>> > lot
>>>> > > of EOFExceptions on the same topic and partition. I wonder if it was
>>>> > > hitting the same problem (lots of empty messages) even though we
>>>> asked
>>>> > the
>>>> > > broker to park the request till enough bytes came through.
>>>> > >
>>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
>>>> wrote:
>>>> > >
>>>> > >> I am writing a Kafka consumer client using the document at
>>>> > >>
>>>> >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>>>> > >>
>>>> > >> One place where I am having problems is the fetch request itself.
>>>> I am
>>>> > >> able to send fetch requests and can get fetch responses that I can
>>>> parse
>>>> > >> properly, but it seems like the broker is not respecting my max
>>>> wait
>>>> > time
>>>> > >> and min fetch bytes parameters.
>>>> > >>
>>>> > >> To test this part I am sending in a fetch request for 128
>>>> partitions of
>>>> > a
>>>> > >> single topic  that hasn't seen any messages for a while and is
>>>> currently
>>>> > >> empty. All 128 partitions are on the same broker (running 0.9). I
>>>> would
>>>> > >> expect the broker to NOT send me any replies till my
>>>> max_wait_time_ms
>>>> > >> elapses but it is sending me a reply immediately. This reply is
>>>> empty
>>>> > (as
>>>> > >> expected) since the partitions have no data and I can parse the
>>>> data
>>>> > just
>>>> > >> fine but I don't understand why the broker is sending me a reply
>>>> > >> immediately instead of waiting long enough.
>>>> > >>
>>>> > >> Here is how I make a request:
>>>> > >>
>>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>>>> > >>     // This does the math to get the size required.
>>>> > >>     final int sizeRequired =
>>>> > >> numBytesRequiredForFetchRequest(numPartitions);
>>>> > >>     final ByteBuffer buffer =
>>>> ByteBuffer.allocateDirect(sizeRequired);
>>>> > >>     // Size field
>>>> > >>     int sizeField = sizeRequired - 4;
>>>> > >>     buffer.putInt(sizeField);
>>>> > >>     // API key.
>>>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>>>> > >>     // API version.
>>>> > >>     buffer.putShort((short) 0);
>>>> > >>     // Correlation id.
>>>> > >>     buffer.putInt(-3);  // Just a random correlation id.
>>>> > >>     // Client id.
>>>> > >>     buffer.putShort(numClientStringBytes); // The length of the
>>>> client
>>>> > >> string as a short.
>>>> > >>     buffer.put(clientStringBytes); // The client string bytes.
>>>> > >>     // Replica id.
>>>> > >>     buffer.putInt(-1);  // As per the recommendation.
>>>> > >>     // Max wait time in ms.
>>>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>>>> > >>     // Min bytes field size.
>>>> > >>     buffer.putInt(1000000);  // A big number.
>>>> > >>     // Num topics.
>>>> > >>     buffer.putInt(1); // A single topic.
>>>> > >>     // Topic string.
>>>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
>>>> string as
>>>> > >> a short.
>>>> > >>     buffer.put(topicBytes); // The topic string bytes.
>>>> > >>     // Num partitions field.
>>>> > >>     buffer.putInt(numPartitions); // 128 like I said.
>>>> > >>     for (int i = 0; i < numPartitions; i++) {
>>>> > >>       final int partitionId = i;
>>>> > >>       // partition number.
>>>> > >>       buffer.putInt(partitionId);
>>>> > >>       // offset.
>>>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have an
>>>> array
>>>> > >> of longs to get this from.
>>>> > >>       // maxBytesPerPartition.
>>>> > >>       buffer.putInt(maxBytesPerPartition);
>>>> > >>     }
>>>> > >>
>>>> > >>     buffer.flip();
>>>> > >>
>>>> > >>     return buffer;
>>>> > >> }
>>>> > >>
>>>> > >> I get a response pretty much immediately when I write this request
>>>> to
>>>> > the
>>>> > >> broker. The response parses just fine but has no actual non zero
>>>> size
>>>> > >> message sets.
>>>> > >>
>>>> > >> Thanks in advance.
>>>> > >> Rajiv
>>>> > >>
>>>> > >>
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Re: Kafka protocol fetch request max wait.

Posted by Rajiv Kurian <ra...@signalfx.com>.
I think I found out when the problem happens. When a broker that is sent a
fetch request has no messages for any of the partitions it is being asked
messages for, it returns immediately instead of waiting out the poll
period. Both the kafka 0.9 consumer and my own hand written consumer suffer
the same problem caused by the broker returning immediately. Since my log
retention time is only 10 minutes as soon as I stop all of my producers I
only need to wait for 10 minutes for the logs to age out and for this
problem to happen.

Though it might not be a big problem for most people,it is quite
conceivable that some users have low frequency topic-partitions that a lot
of clients listen to with big min_wait_time_ms parameters. Examples being
some infrequent metadata update topic. My guess is that such use cases
would actually cause the problem when that low frequency topic-partition is
isolated to a broker or two. It is especially harrowing because it goes
against all intuition that a topic with no traffic should cause little to
no over head.

I'll update KAFKA-3159 with my findings, but it would be great to get
confirmation that you can make this happen Jason.

Thanks,
Rajiv

On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> I actually restarted my application with the consumer config I mentioned
> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it to
> use high CPU any more :( Not quite sure about how to proceed. I'll try to
> shut down all producers and let the logs age out to see if the problem
> happens under those conditions.
>
> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
>> Hey Jason,
>>
>> Yes I checked for error codes. There were none. The message was perfectly
>> legal as parsed by my hand written parser. I also verified the size of the
>> response which was exactly the size of a response with an empty message set
>> per partition.
>>
>> The topic has 128 partitions and has a retention of 10 minutes and a
>> replication factor of 3. The 128 partitions are divided amongst 3 brokers
>> but I managed to replicate the problem of premature responses even running
>> my own code in a debugger connected to a locally running kafka instance.
>>
>> I haven't made any changes to the topic configuration while running these
>> tests. All the changes I have made are to the settings of my fetch request
>> i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. I
>> haven't exactly noted all the changes I made but I think I can try to get
>> my original configuration and see if that reproduces the problem both for
>> the consumer I wrote myself and the stock 0.9 consumer.
>>
>> I definitely saw empty responses being returned really quickly when
>> running my own client locally (under a debugger) and so it's just a theory
>> that that might have been the problem being all those EOFExceptions.
>>
>> Rajiv
>>
>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>>> Hey Rajiv,
>>>
>>> Just to be clear, when you received the empty fetch response, did you
>>> check
>>> the error codes? It would help to also include some more information
>>> (such
>>> as broker and topic settings). If you can come up with a way to reproduce
>>> it, that will help immensely.
>>>
>>> Also, would you mind updating KAFKA-3159 with your findings about the
>>> high
>>> CPU issue? If the problem went away after a configuration change, does it
>>> come back when those changes are reverted?
>>>
>>> Thanks,
>>> Jason
>>>
>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>>>
>>> > Indeed this seems to be the case. I am now running the client
>>> mentioned in
>>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
>>> > taking up high CPU. The high number of EOF exceptions are also gone.
>>> It is
>>> > performing very well now. I can't understand if the improvement is
>>> because
>>> > of my config  changes (min_bytes, max_bytes_per_partition, max wait
>>> time)
>>> > etc or because of a bug in the 0.9 broker. I have definitely under a
>>> > debugger seen a problem where I was getting back empty messages from
>>> the
>>> > broker running locally. It might be worth creating a bug for this.
>>> >
>>> > Thanks,
>>> > Rajiv
>>> >
>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
>>> wrote:
>>> >
>>> > > And just like that it stopped happening even though I didn't change
>>> any
>>> > of
>>> > > my code. I had filed
>>> https://issues.apache.org/jira/browse/KAFKA-3159
>>> > > where the stock 0.9 kafka consumer was using very high CPU and
>>> seeing a
>>> > lot
>>> > > of EOFExceptions on the same topic and partition. I wonder if it was
>>> > > hitting the same problem (lots of empty messages) even though we
>>> asked
>>> > the
>>> > > broker to park the request till enough bytes came through.
>>> > >
>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
>>> wrote:
>>> > >
>>> > >> I am writing a Kafka consumer client using the document at
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>>> > >>
>>> > >> One place where I am having problems is the fetch request itself. I
>>> am
>>> > >> able to send fetch requests and can get fetch responses that I can
>>> parse
>>> > >> properly, but it seems like the broker is not respecting my max wait
>>> > time
>>> > >> and min fetch bytes parameters.
>>> > >>
>>> > >> To test this part I am sending in a fetch request for 128
>>> partitions of
>>> > a
>>> > >> single topic  that hasn't seen any messages for a while and is
>>> currently
>>> > >> empty. All 128 partitions are on the same broker (running 0.9). I
>>> would
>>> > >> expect the broker to NOT send me any replies till my
>>> max_wait_time_ms
>>> > >> elapses but it is sending me a reply immediately. This reply is
>>> empty
>>> > (as
>>> > >> expected) since the partitions have no data and I can parse the data
>>> > just
>>> > >> fine but I don't understand why the broker is sending me a reply
>>> > >> immediately instead of waiting long enough.
>>> > >>
>>> > >> Here is how I make a request:
>>> > >>
>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>>> > >>     // This does the math to get the size required.
>>> > >>     final int sizeRequired =
>>> > >> numBytesRequiredForFetchRequest(numPartitions);
>>> > >>     final ByteBuffer buffer =
>>> ByteBuffer.allocateDirect(sizeRequired);
>>> > >>     // Size field
>>> > >>     int sizeField = sizeRequired - 4;
>>> > >>     buffer.putInt(sizeField);
>>> > >>     // API key.
>>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>>> > >>     // API version.
>>> > >>     buffer.putShort((short) 0);
>>> > >>     // Correlation id.
>>> > >>     buffer.putInt(-3);  // Just a random correlation id.
>>> > >>     // Client id.
>>> > >>     buffer.putShort(numClientStringBytes); // The length of the
>>> client
>>> > >> string as a short.
>>> > >>     buffer.put(clientStringBytes); // The client string bytes.
>>> > >>     // Replica id.
>>> > >>     buffer.putInt(-1);  // As per the recommendation.
>>> > >>     // Max wait time in ms.
>>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>>> > >>     // Min bytes field size.
>>> > >>     buffer.putInt(1000000);  // A big number.
>>> > >>     // Num topics.
>>> > >>     buffer.putInt(1); // A single topic.
>>> > >>     // Topic string.
>>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
>>> string as
>>> > >> a short.
>>> > >>     buffer.put(topicBytes); // The topic string bytes.
>>> > >>     // Num partitions field.
>>> > >>     buffer.putInt(numPartitions); // 128 like I said.
>>> > >>     for (int i = 0; i < numPartitions; i++) {
>>> > >>       final int partitionId = i;
>>> > >>       // partition number.
>>> > >>       buffer.putInt(partitionId);
>>> > >>       // offset.
>>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have an
>>> array
>>> > >> of longs to get this from.
>>> > >>       // maxBytesPerPartition.
>>> > >>       buffer.putInt(maxBytesPerPartition);
>>> > >>     }
>>> > >>
>>> > >>     buffer.flip();
>>> > >>
>>> > >>     return buffer;
>>> > >> }
>>> > >>
>>> > >> I get a response pretty much immediately when I write this request
>>> to
>>> > the
>>> > >> broker. The response parses just fine but has no actual non zero
>>> size
>>> > >> message sets.
>>> > >>
>>> > >> Thanks in advance.
>>> > >> Rajiv
>>> > >>
>>> > >>
>>> > >
>>> >
>>>
>>
>>
>

Re: Kafka protocol fetch request max wait.

Posted by Rajiv Kurian <ra...@signalfx.com>.
I actually restarted my application with the consumer config I mentioned at
https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it to use
high CPU any more :( Not quite sure about how to proceed. I'll try to shut
down all producers and let the logs age out to see if the problem happens
under those conditions.

On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Hey Jason,
>
> Yes I checked for error codes. There were none. The message was perfectly
> legal as parsed by my hand written parser. I also verified the size of the
> response which was exactly the size of a response with an empty message set
> per partition.
>
> The topic has 128 partitions and has a retention of 10 minutes and a
> replication factor of 3. The 128 partitions are divided amongst 3 brokers
> but I managed to replicate the problem of premature responses even running
> my own code in a debugger connected to a locally running kafka instance.
>
> I haven't made any changes to the topic configuration while running these
> tests. All the changes I have made are to the settings of my fetch request
> i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. I
> haven't exactly noted all the changes I made but I think I can try to get
> my original configuration and see if that reproduces the problem both for
> the consumer I wrote myself and the stock 0.9 consumer.
>
> I definitely saw empty responses being returned really quickly when
> running my own client locally (under a debugger) and so it's just a theory
> that that might have been the problem being all those EOFExceptions.
>
> Rajiv
>
> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
>> Hey Rajiv,
>>
>> Just to be clear, when you received the empty fetch response, did you
>> check
>> the error codes? It would help to also include some more information (such
>> as broker and topic settings). If you can come up with a way to reproduce
>> it, that will help immensely.
>>
>> Also, would you mind updating KAFKA-3159 with your findings about the high
>> CPU issue? If the problem went away after a configuration change, does it
>> come back when those changes are reverted?
>>
>> Thanks,
>> Jason
>>
>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>>
>> > Indeed this seems to be the case. I am now running the client mentioned
>> in
>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
>> > taking up high CPU. The high number of EOF exceptions are also gone. It
>> is
>> > performing very well now. I can't understand if the improvement is
>> because
>> > of my config  changes (min_bytes, max_bytes_per_partition, max wait
>> time)
>> > etc or because of a bug in the 0.9 broker. I have definitely under a
>> > debugger seen a problem where I was getting back empty messages from the
>> > broker running locally. It might be worth creating a bug for this.
>> >
>> > Thanks,
>> > Rajiv
>> >
>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
>> wrote:
>> >
>> > > And just like that it stopped happening even though I didn't change
>> any
>> > of
>> > > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
>> > > where the stock 0.9 kafka consumer was using very high CPU and seeing
>> a
>> > lot
>> > > of EOFExceptions on the same topic and partition. I wonder if it was
>> > > hitting the same problem (lots of empty messages) even though we asked
>> > the
>> > > broker to park the request till enough bytes came through.
>> > >
>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
>> wrote:
>> > >
>> > >> I am writing a Kafka consumer client using the document at
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>> > >>
>> > >> One place where I am having problems is the fetch request itself. I
>> am
>> > >> able to send fetch requests and can get fetch responses that I can
>> parse
>> > >> properly, but it seems like the broker is not respecting my max wait
>> > time
>> > >> and min fetch bytes parameters.
>> > >>
>> > >> To test this part I am sending in a fetch request for 128 partitions
>> of
>> > a
>> > >> single topic  that hasn't seen any messages for a while and is
>> currently
>> > >> empty. All 128 partitions are on the same broker (running 0.9). I
>> would
>> > >> expect the broker to NOT send me any replies till my max_wait_time_ms
>> > >> elapses but it is sending me a reply immediately. This reply is empty
>> > (as
>> > >> expected) since the partitions have no data and I can parse the data
>> > just
>> > >> fine but I don't understand why the broker is sending me a reply
>> > >> immediately instead of waiting long enough.
>> > >>
>> > >> Here is how I make a request:
>> > >>
>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>> > >>     // This does the math to get the size required.
>> > >>     final int sizeRequired =
>> > >> numBytesRequiredForFetchRequest(numPartitions);
>> > >>     final ByteBuffer buffer =
>> ByteBuffer.allocateDirect(sizeRequired);
>> > >>     // Size field
>> > >>     int sizeField = sizeRequired - 4;
>> > >>     buffer.putInt(sizeField);
>> > >>     // API key.
>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>> > >>     // API version.
>> > >>     buffer.putShort((short) 0);
>> > >>     // Correlation id.
>> > >>     buffer.putInt(-3);  // Just a random correlation id.
>> > >>     // Client id.
>> > >>     buffer.putShort(numClientStringBytes); // The length of the
>> client
>> > >> string as a short.
>> > >>     buffer.put(clientStringBytes); // The client string bytes.
>> > >>     // Replica id.
>> > >>     buffer.putInt(-1);  // As per the recommendation.
>> > >>     // Max wait time in ms.
>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>> > >>     // Min bytes field size.
>> > >>     buffer.putInt(1000000);  // A big number.
>> > >>     // Num topics.
>> > >>     buffer.putInt(1); // A single topic.
>> > >>     // Topic string.
>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
>> string as
>> > >> a short.
>> > >>     buffer.put(topicBytes); // The topic string bytes.
>> > >>     // Num partitions field.
>> > >>     buffer.putInt(numPartitions); // 128 like I said.
>> > >>     for (int i = 0; i < numPartitions; i++) {
>> > >>       final int partitionId = i;
>> > >>       // partition number.
>> > >>       buffer.putInt(partitionId);
>> > >>       // offset.
>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have an
>> array
>> > >> of longs to get this from.
>> > >>       // maxBytesPerPartition.
>> > >>       buffer.putInt(maxBytesPerPartition);
>> > >>     }
>> > >>
>> > >>     buffer.flip();
>> > >>
>> > >>     return buffer;
>> > >> }
>> > >>
>> > >> I get a response pretty much immediately when I write this request to
>> > the
>> > >> broker. The response parses just fine but has no actual non zero size
>> > >> message sets.
>> > >>
>> > >> Thanks in advance.
>> > >> Rajiv
>> > >>
>> > >>
>> > >
>> >
>>
>
>

Re: Kafka protocol fetch request max wait.

Posted by Rajiv Kurian <ra...@signalfx.com>.
Hey Jason,

Yes I checked for error codes. There were none. The message was perfectly
legal as parsed by my hand written parser. I also verified the size of the
response which was exactly the size of a response with an empty message set
per partition.

The topic has 128 partitions and has a retention of 10 minutes and a
replication factor of 3. The 128 partitions are divided amongst 3 brokers
but I managed to replicate the problem of premature responses even running
my own code in a debugger connected to a locally running kafka instance.

I haven't made any changes to the topic configuration while running these
tests. All the changes I have made are to the settings of my fetch request
i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. I
haven't exactly noted all the changes I made but I think I can try to get
my original configuration and see if that reproduces the problem both for
the consumer I wrote myself and the stock 0.9 consumer.

I definitely saw empty responses being returned really quickly when running
my own client locally (under a debugger) and so it's just a theory that
that might have been the problem being all those EOFExceptions.

Rajiv

On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajiv,
>
> Just to be clear, when you received the empty fetch response, did you check
> the error codes? It would help to also include some more information (such
> as broker and topic settings). If you can come up with a way to reproduce
> it, that will help immensely.
>
> Also, would you mind updating KAFKA-3159 with your findings about the high
> CPU issue? If the problem went away after a configuration change, does it
> come back when those changes are reverted?
>
> Thanks,
> Jason
>
> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > Indeed this seems to be the case. I am now running the client mentioned
> in
> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
> > taking up high CPU. The high number of EOF exceptions are also gone. It
> is
> > performing very well now. I can't understand if the improvement is
> because
> > of my config  changes (min_bytes, max_bytes_per_partition, max wait time)
> > etc or because of a bug in the 0.9 broker. I have definitely under a
> > debugger seen a problem where I was getting back empty messages from the
> > broker running locally. It might be worth creating a bug for this.
> >
> > Thanks,
> > Rajiv
> >
> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
> >
> > > And just like that it stopped happening even though I didn't change any
> > of
> > > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
> > > where the stock 0.9 kafka consumer was using very high CPU and seeing a
> > lot
> > > of EOFExceptions on the same topic and partition. I wonder if it was
> > > hitting the same problem (lots of empty messages) even though we asked
> > the
> > > broker to park the request till enough bytes came through.
> > >
> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> > >
> > >> I am writing a Kafka consumer client using the document at
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >>
> > >> One place where I am having problems is the fetch request itself. I am
> > >> able to send fetch requests and can get fetch responses that I can
> parse
> > >> properly, but it seems like the broker is not respecting my max wait
> > time
> > >> and min fetch bytes parameters.
> > >>
> > >> To test this part I am sending in a fetch request for 128 partitions
> of
> > a
> > >> single topic  that hasn't seen any messages for a while and is
> currently
> > >> empty. All 128 partitions are on the same broker (running 0.9). I
> would
> > >> expect the broker to NOT send me any replies till my max_wait_time_ms
> > >> elapses but it is sending me a reply immediately. This reply is empty
> > (as
> > >> expected) since the partitions have no data and I can parse the data
> > just
> > >> fine but I don't understand why the broker is sending me a reply
> > >> immediately instead of waiting long enough.
> > >>
> > >> Here is how I make a request:
> > >>
> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
> > >>     // This does the math to get the size required.
> > >>     final int sizeRequired =
> > >> numBytesRequiredForFetchRequest(numPartitions);
> > >>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
> > >>     // Size field
> > >>     int sizeField = sizeRequired - 4;
> > >>     buffer.putInt(sizeField);
> > >>     // API key.
> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> > >>     // API version.
> > >>     buffer.putShort((short) 0);
> > >>     // Correlation id.
> > >>     buffer.putInt(-3);  // Just a random correlation id.
> > >>     // Client id.
> > >>     buffer.putShort(numClientStringBytes); // The length of the client
> > >> string as a short.
> > >>     buffer.put(clientStringBytes); // The client string bytes.
> > >>     // Replica id.
> > >>     buffer.putInt(-1);  // As per the recommendation.
> > >>     // Max wait time in ms.
> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> > >>     // Min bytes field size.
> > >>     buffer.putInt(1000000);  // A big number.
> > >>     // Num topics.
> > >>     buffer.putInt(1); // A single topic.
> > >>     // Topic string.
> > >>     buffer.putShort(numTopicBytes); // The length of the topic string
> as
> > >> a short.
> > >>     buffer.put(topicBytes); // The topic string bytes.
> > >>     // Num partitions field.
> > >>     buffer.putInt(numPartitions); // 128 like I said.
> > >>     for (int i = 0; i < numPartitions; i++) {
> > >>       final int partitionId = i;
> > >>       // partition number.
> > >>       buffer.putInt(partitionId);
> > >>       // offset.
> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have an
> array
> > >> of longs to get this from.
> > >>       // maxBytesPerPartition.
> > >>       buffer.putInt(maxBytesPerPartition);
> > >>     }
> > >>
> > >>     buffer.flip();
> > >>
> > >>     return buffer;
> > >> }
> > >>
> > >> I get a response pretty much immediately when I write this request to
> > the
> > >> broker. The response parses just fine but has no actual non zero size
> > >> message sets.
> > >>
> > >> Thanks in advance.
> > >> Rajiv
> > >>
> > >>
> > >
> >
>

Re: Kafka protocol fetch request max wait.

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Rajiv,

Just to be clear, when you received the empty fetch response, did you check
the error codes? It would help to also include some more information (such
as broker and topic settings). If you can come up with a way to reproduce
it, that will help immensely.

Also, would you mind updating KAFKA-3159 with your findings about the high
CPU issue? If the problem went away after a configuration change, does it
come back when those changes are reverted?

Thanks,
Jason

On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Indeed this seems to be the case. I am now running the client mentioned in
> https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
> taking up high CPU. The high number of EOF exceptions are also gone. It is
> performing very well now. I can't understand if the improvement is because
> of my config  changes (min_bytes, max_bytes_per_partition, max wait time)
> etc or because of a bug in the 0.9 broker. I have definitely under a
> debugger seen a problem where I was getting back empty messages from the
> broker running locally. It might be worth creating a bug for this.
>
> Thanks,
> Rajiv
>
> On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > And just like that it stopped happening even though I didn't change any
> of
> > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
> > where the stock 0.9 kafka consumer was using very high CPU and seeing a
> lot
> > of EOFExceptions on the same topic and partition. I wonder if it was
> > hitting the same problem (lots of empty messages) even though we asked
> the
> > broker to park the request till enough bytes came through.
> >
> > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
> >
> >> I am writing a Kafka consumer client using the document at
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >>
> >> One place where I am having problems is the fetch request itself. I am
> >> able to send fetch requests and can get fetch responses that I can parse
> >> properly, but it seems like the broker is not respecting my max wait
> time
> >> and min fetch bytes parameters.
> >>
> >> To test this part I am sending in a fetch request for 128 partitions of
> a
> >> single topic  that hasn't seen any messages for a while and is currently
> >> empty. All 128 partitions are on the same broker (running 0.9). I would
> >> expect the broker to NOT send me any replies till my max_wait_time_ms
> >> elapses but it is sending me a reply immediately. This reply is empty
> (as
> >> expected) since the partitions have no data and I can parse the data
> just
> >> fine but I don't understand why the broker is sending me a reply
> >> immediately instead of waiting long enough.
> >>
> >> Here is how I make a request:
> >>
> >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
> >>     // This does the math to get the size required.
> >>     final int sizeRequired =
> >> numBytesRequiredForFetchRequest(numPartitions);
> >>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
> >>     // Size field
> >>     int sizeField = sizeRequired - 4;
> >>     buffer.putInt(sizeField);
> >>     // API key.
> >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> >>     // API version.
> >>     buffer.putShort((short) 0);
> >>     // Correlation id.
> >>     buffer.putInt(-3);  // Just a random correlation id.
> >>     // Client id.
> >>     buffer.putShort(numClientStringBytes); // The length of the client
> >> string as a short.
> >>     buffer.put(clientStringBytes); // The client string bytes.
> >>     // Replica id.
> >>     buffer.putInt(-1);  // As per the recommendation.
> >>     // Max wait time in ms.
> >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> >>     // Min bytes field size.
> >>     buffer.putInt(1000000);  // A big number.
> >>     // Num topics.
> >>     buffer.putInt(1); // A single topic.
> >>     // Topic string.
> >>     buffer.putShort(numTopicBytes); // The length of the topic string as
> >> a short.
> >>     buffer.put(topicBytes); // The topic string bytes.
> >>     // Num partitions field.
> >>     buffer.putInt(numPartitions); // 128 like I said.
> >>     for (int i = 0; i < numPartitions; i++) {
> >>       final int partitionId = i;
> >>       // partition number.
> >>       buffer.putInt(partitionId);
> >>       // offset.
> >>       buffer.putLong(partitionToOffset[partitionId]); // I have an array
> >> of longs to get this from.
> >>       // maxBytesPerPartition.
> >>       buffer.putInt(maxBytesPerPartition);
> >>     }
> >>
> >>     buffer.flip();
> >>
> >>     return buffer;
> >> }
> >>
> >> I get a response pretty much immediately when I write this request to
> the
> >> broker. The response parses just fine but has no actual non zero size
> >> message sets.
> >>
> >> Thanks in advance.
> >> Rajiv
> >>
> >>
> >
>

Re: Kafka protocol fetch request max wait.

Posted by Rajiv Kurian <ra...@signalfx.com>.
Indeed this seems to be the case. I am now running the client mentioned in
https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
taking up high CPU. The high number of EOF exceptions are also gone. It is
performing very well now. I can't understand if the improvement is because
of my config  changes (min_bytes, max_bytes_per_partition, max wait time)
etc or because of a bug in the 0.9 broker. I have definitely under a
debugger seen a problem where I was getting back empty messages from the
broker running locally. It might be worth creating a bug for this.

Thanks,
Rajiv

On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> And just like that it stopped happening even though I didn't change any of
> my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
> where the stock 0.9 kafka consumer was using very high CPU and seeing a lot
> of EOFExceptions on the same topic and partition. I wonder if it was
> hitting the same problem (lots of empty messages) even though we asked the
> broker to park the request till enough bytes came through.
>
> On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
>> I am writing a Kafka consumer client using the document at
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>>
>> One place where I am having problems is the fetch request itself. I am
>> able to send fetch requests and can get fetch responses that I can parse
>> properly, but it seems like the broker is not respecting my max wait time
>> and min fetch bytes parameters.
>>
>> To test this part I am sending in a fetch request for 128 partitions of a
>> single topic  that hasn't seen any messages for a while and is currently
>> empty. All 128 partitions are on the same broker (running 0.9). I would
>> expect the broker to NOT send me any replies till my max_wait_time_ms
>> elapses but it is sending me a reply immediately. This reply is empty (as
>> expected) since the partitions have no data and I can parse the data just
>> fine but I don't understand why the broker is sending me a reply
>> immediately instead of waiting long enough.
>>
>> Here is how I make a request:
>>
>> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>>     // This does the math to get the size required.
>>     final int sizeRequired =
>> numBytesRequiredForFetchRequest(numPartitions);
>>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
>>     // Size field
>>     int sizeField = sizeRequired - 4;
>>     buffer.putInt(sizeField);
>>     // API key.
>>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>>     // API version.
>>     buffer.putShort((short) 0);
>>     // Correlation id.
>>     buffer.putInt(-3);  // Just a random correlation id.
>>     // Client id.
>>     buffer.putShort(numClientStringBytes); // The length of the client
>> string as a short.
>>     buffer.put(clientStringBytes); // The client string bytes.
>>     // Replica id.
>>     buffer.putInt(-1);  // As per the recommendation.
>>     // Max wait time in ms.
>>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>>     // Min bytes field size.
>>     buffer.putInt(1000000);  // A big number.
>>     // Num topics.
>>     buffer.putInt(1); // A single topic.
>>     // Topic string.
>>     buffer.putShort(numTopicBytes); // The length of the topic string as
>> a short.
>>     buffer.put(topicBytes); // The topic string bytes.
>>     // Num partitions field.
>>     buffer.putInt(numPartitions); // 128 like I said.
>>     for (int i = 0; i < numPartitions; i++) {
>>       final int partitionId = i;
>>       // partition number.
>>       buffer.putInt(partitionId);
>>       // offset.
>>       buffer.putLong(partitionToOffset[partitionId]); // I have an array
>> of longs to get this from.
>>       // maxBytesPerPartition.
>>       buffer.putInt(maxBytesPerPartition);
>>     }
>>
>>     buffer.flip();
>>
>>     return buffer;
>> }
>>
>> I get a response pretty much immediately when I write this request to the
>> broker. The response parses just fine but has no actual non zero size
>> message sets.
>>
>> Thanks in advance.
>> Rajiv
>>
>>
>

Re: Kafka protocol fetch request max wait.

Posted by Rajiv Kurian <ra...@signalfx.com>.
And just like that it stopped happening even though I didn't change any of
my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159 where
the stock 0.9 kafka consumer was using very high CPU and seeing a lot of
EOFExceptions on the same topic and partition. I wonder if it was hitting
the same problem (lots of empty messages) even though we asked the broker
to park the request till enough bytes came through.

On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> I am writing a Kafka consumer client using the document at
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>
> One place where I am having problems is the fetch request itself. I am
> able to send fetch requests and can get fetch responses that I can parse
> properly, but it seems like the broker is not respecting my max wait time
> and min fetch bytes parameters.
>
> To test this part I am sending in a fetch request for 128 partitions of a
> single topic  that hasn't seen any messages for a while and is currently
> empty. All 128 partitions are on the same broker (running 0.9). I would
> expect the broker to NOT send me any replies till my max_wait_time_ms
> elapses but it is sending me a reply immediately. This reply is empty (as
> expected) since the partitions have no data and I can parse the data just
> fine but I don't understand why the broker is sending me a reply
> immediately instead of waiting long enough.
>
> Here is how I make a request:
>
> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>     // This does the math to get the size required.
>     final int sizeRequired =
> numBytesRequiredForFetchRequest(numPartitions);
>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
>     // Size field
>     int sizeField = sizeRequired - 4;
>     buffer.putInt(sizeField);
>     // API key.
>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>     // API version.
>     buffer.putShort((short) 0);
>     // Correlation id.
>     buffer.putInt(-3);  // Just a random correlation id.
>     // Client id.
>     buffer.putShort(numClientStringBytes); // The length of the client
> string as a short.
>     buffer.put(clientStringBytes); // The client string bytes.
>     // Replica id.
>     buffer.putInt(-1);  // As per the recommendation.
>     // Max wait time in ms.
>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>     // Min bytes field size.
>     buffer.putInt(1000000);  // A big number.
>     // Num topics.
>     buffer.putInt(1); // A single topic.
>     // Topic string.
>     buffer.putShort(numTopicBytes); // The length of the topic string as a
> short.
>     buffer.put(topicBytes); // The topic string bytes.
>     // Num partitions field.
>     buffer.putInt(numPartitions); // 128 like I said.
>     for (int i = 0; i < numPartitions; i++) {
>       final int partitionId = i;
>       // partition number.
>       buffer.putInt(partitionId);
>       // offset.
>       buffer.putLong(partitionToOffset[partitionId]); // I have an array
> of longs to get this from.
>       // maxBytesPerPartition.
>       buffer.putInt(maxBytesPerPartition);
>     }
>
>     buffer.flip();
>
>     return buffer;
> }
>
> I get a response pretty much immediately when I write this request to the
> broker. The response parses just fine but has no actual non zero size
> message sets.
>
> Thanks in advance.
> Rajiv
>
>