You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Manu Zhang <ow...@gmail.com> on 2015/01/19 01:54:26 UTC

can't iterate consumed messages when checking errorCode first

Hi all,

I'm using Kafka low level consumer api and find in the below codes
"iterator.hasNext" always return false. Through debugging, I'm sure the
messageSet has the size of "fetchSize"

*     val consumer = new SimpleConsumer(broker.host, broker.port,
soTimeout, soBufferSize, clientId)*
*      val request = new FetchRequestBuilder()*
*        .addFetch(topic, partition, offset, fetchSize)*
*        .build()*
*      val response = consumer.fetch(request)*
*      response.errorCode(topic, partition) match {*
*        case NoError => {*
*          iterator = response.messageSet(topic, partition).iterator*
*        }*
*        case error => throw exceptionFor(error)*
*     }*

The weird thing is that the iterator works fine when I get iterator
directly without checking the error code.

*     val consumer = new SimpleConsumer(broker.host, broker.port,
soTimeout, soBufferSize, clientId)*
*      val request = new FetchRequestBuilder()*
*        .addFetch(topic, partition, offset, fetchSize)*
*        .build()*
*       consumer.fetch(request).messageSet(topic, partition).iterator*

Any thoughts ?

Thanks,
Manu Zhang

Re: can't iterate consumed messages when checking errorCode first

Posted by Manu Zhang <ow...@gmail.com>.
Thanks Jun. I don't see any error code and the fetch size is large enough
to than the largest single message. Actually, when I call
response.messageSet(topic, partition).toBuffer.size the value is the number
of messages I've produced to Kafka.

On Tue Jan 20 2015 at 上午12:31:53 Jun Rao <ju...@confluent.io> wrote:

> Did you get any error code in the response? Also, make sure fetchSize is
> larger than the largest single message.
>
> Thanks,
>
> Jun
>
> On Sun, Jan 18, 2015 at 4:54 PM, Manu Zhang <ow...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I'm using Kafka low level consumer api and find in the below codes
> > "iterator.hasNext" always return false. Through debugging, I'm sure the
> > messageSet has the size of "fetchSize"
> >
> > *     val consumer = new SimpleConsumer(broker.host, broker.port,
> > soTimeout, soBufferSize, clientId)*
> > *      val request = new FetchRequestBuilder()*
> > *        .addFetch(topic, partition, offset, fetchSize)*
> > *        .build()*
> > *      val response = consumer.fetch(request)*
> > *      response.errorCode(topic, partition) match {*
> > *        case NoError => {*
> > *          iterator = response.messageSet(topic, partition).iterator*
> > *        }*
> > *        case error => throw exceptionFor(error)*
> > *     }*
> >
> > The weird thing is that the iterator works fine when I get iterator
> > directly without checking the error code.
> >
> > *     val consumer = new SimpleConsumer(broker.host, broker.port,
> > soTimeout, soBufferSize, clientId)*
> > *      val request = new FetchRequestBuilder()*
> > *        .addFetch(topic, partition, offset, fetchSize)*
> > *        .build()*
> > *       consumer.fetch(request).messageSet(topic, partition).iterator*
> >
> > Any thoughts ?
> >
> > Thanks,
> > Manu Zhang
> >
>

Re: can't iterate consumed messages when checking errorCode first

Posted by Jun Rao <ju...@confluent.io>.
Did you get any error code in the response? Also, make sure fetchSize is
larger than the largest single message.

Thanks,

Jun

On Sun, Jan 18, 2015 at 4:54 PM, Manu Zhang <ow...@gmail.com> wrote:

> Hi all,
>
> I'm using Kafka low level consumer api and find in the below codes
> "iterator.hasNext" always return false. Through debugging, I'm sure the
> messageSet has the size of "fetchSize"
>
> *     val consumer = new SimpleConsumer(broker.host, broker.port,
> soTimeout, soBufferSize, clientId)*
> *      val request = new FetchRequestBuilder()*
> *        .addFetch(topic, partition, offset, fetchSize)*
> *        .build()*
> *      val response = consumer.fetch(request)*
> *      response.errorCode(topic, partition) match {*
> *        case NoError => {*
> *          iterator = response.messageSet(topic, partition).iterator*
> *        }*
> *        case error => throw exceptionFor(error)*
> *     }*
>
> The weird thing is that the iterator works fine when I get iterator
> directly without checking the error code.
>
> *     val consumer = new SimpleConsumer(broker.host, broker.port,
> soTimeout, soBufferSize, clientId)*
> *      val request = new FetchRequestBuilder()*
> *        .addFetch(topic, partition, offset, fetchSize)*
> *        .build()*
> *       consumer.fetch(request).messageSet(topic, partition).iterator*
>
> Any thoughts ?
>
> Thanks,
> Manu Zhang
>