You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Lu Xuechao <lu...@gmail.com> on 2013/10/31 19:26:40 UTC

SimpleConsumer cannot read KeyedMessage.

Hi,

I am following the
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

When I send KeyedMessage<String, String> with StringEncoder, I can get the
messages sent:

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(m_topic,
m_partition)) {
 //handle messages
}

But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I cannot
get the messages:

Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
m_partition).iterator();
itr.hasNext()  is FALSE.

the test code is the same, what is causing this? What change needs to be
made?

thanks.

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Jun Rao <ju...@gmail.com>.
Did you make sure the fetch size in the fetch request is larger than the
size of a single message?

Thanks,

Jun


On Fri, Nov 1, 2013 at 5:07 PM, Lu Xuechao <lu...@gmail.com> wrote:

> The consumer starts from offset 0. Yes, in the log dir.
>
>
> On Fri, Nov 1, 2013 at 4:06 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Which offset did you use for fetching? Is there data in the kafka log
> dir?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Nov 1, 2013 at 11:48 AM, Lu Xuechao <lu...@gmail.com> wrote:
> >
> > > checked fetchResponse.hasError() but has no error.
> > >
> > >
> > > On Fri, Nov 1, 2013 at 7:45 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Did you check the error code associated with each partition in the
> > fetch
> > > > response?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <lu...@gmail.com>
> wrote:
> > > >
> > > > > No. The simple consumer does receive some responses and can iterate
> > the
> > > > > loop:
> > > > >
> > > > > for (MessageAndOffset messageAndOffset :
> > > > fetchResponse.messageSet(m_topic,
> > > > > m_partition)) {
> > > > >  //handle messages
> > > > > }
> > > > >
> > > > > but after that, the response still returns will byte[], I can see
> the
> > > > > content, but the iterator cannot iterate:
> > > > >
> > > > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > > > > m_partition).iterator();
> > > > > itr.hasNext()  is FALSE.
> > > > >
> > > > > No error messages found.
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > Is that related to
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
> > > > > > ?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > It seems the reason is I enabled gzip compression.
> > > > > > >
> > > > > > > what the code would like to consume compressed messages?
> > > > > > >
> > > > > > > thanks.
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <luxuec@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I am following the
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > > > > > > >
> > > > > > > > When I send KeyedMessage<String, String> with StringEncoder,
> I
> > > can
> > > > > get
> > > > > > > the
> > > > > > > > messages sent:
> > > > > > > >
> > > > > > > > for (MessageAndOffset messageAndOffset :
> > > > > > > fetchResponse.messageSet(m_topic,
> > > > > > > > m_partition)) {
> > > > > > > >  //handle messages
> > > > > > > > }
> > > > > > > >
> > > > > > > > But when I send KeyedMessage<byte[], byte[]> with
> > > DefaultEncoder, I
> > > > > > > cannot
> > > > > > > > get the messages:
> > > > > > > >
> > > > > > > > Iterator<MessageAndOffset> itr =
> > > fetchResponse.messageSet(m_topic,
> > > > > > > > m_partition).iterator();
> > > > > > > > itr.hasNext()  is FALSE.
> > > > > > > >
> > > > > > > > the test code is the same, what is causing this? What change
> > > needs
> > > > to
> > > > > > be
> > > > > > > > made?
> > > > > > > >
> > > > > > > > thanks.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Lu Xuechao <lu...@gmail.com>.
The consumer starts from offset 0. Yes, in the log dir.


On Fri, Nov 1, 2013 at 4:06 PM, Jun Rao <ju...@gmail.com> wrote:

> Which offset did you use for fetching? Is there data in the kafka log dir?
>
> Thanks,
>
> Jun
>
>
> On Fri, Nov 1, 2013 at 11:48 AM, Lu Xuechao <lu...@gmail.com> wrote:
>
> > checked fetchResponse.hasError() but has no error.
> >
> >
> > On Fri, Nov 1, 2013 at 7:45 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Did you check the error code associated with each partition in the
> fetch
> > > response?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <lu...@gmail.com> wrote:
> > >
> > > > No. The simple consumer does receive some responses and can iterate
> the
> > > > loop:
> > > >
> > > > for (MessageAndOffset messageAndOffset :
> > > fetchResponse.messageSet(m_topic,
> > > > m_partition)) {
> > > >  //handle messages
> > > > }
> > > >
> > > > but after that, the response still returns will byte[], I can see the
> > > > content, but the iterator cannot iterate:
> > > >
> > > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > > > m_partition).iterator();
> > > > itr.hasNext()  is FALSE.
> > > >
> > > > No error messages found.
> > > >
> > > >
> > > >
> > > > On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > Is that related to
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
> > > > > ?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com>
> > wrote:
> > > > >
> > > > > > It seems the reason is I enabled gzip compression.
> > > > > >
> > > > > > what the code would like to consume compressed messages?
> > > > > >
> > > > > > thanks.
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am following the
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > > > > > >
> > > > > > > When I send KeyedMessage<String, String> with StringEncoder, I
> > can
> > > > get
> > > > > > the
> > > > > > > messages sent:
> > > > > > >
> > > > > > > for (MessageAndOffset messageAndOffset :
> > > > > > fetchResponse.messageSet(m_topic,
> > > > > > > m_partition)) {
> > > > > > >  //handle messages
> > > > > > > }
> > > > > > >
> > > > > > > But when I send KeyedMessage<byte[], byte[]> with
> > DefaultEncoder, I
> > > > > > cannot
> > > > > > > get the messages:
> > > > > > >
> > > > > > > Iterator<MessageAndOffset> itr =
> > fetchResponse.messageSet(m_topic,
> > > > > > > m_partition).iterator();
> > > > > > > itr.hasNext()  is FALSE.
> > > > > > >
> > > > > > > the test code is the same, what is causing this? What change
> > needs
> > > to
> > > > > be
> > > > > > > made?
> > > > > > >
> > > > > > > thanks.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Jun Rao <ju...@gmail.com>.
Which offset did you use for fetching? Is there data in the kafka log dir?

Thanks,

Jun


On Fri, Nov 1, 2013 at 11:48 AM, Lu Xuechao <lu...@gmail.com> wrote:

> checked fetchResponse.hasError() but has no error.
>
>
> On Fri, Nov 1, 2013 at 7:45 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > Did you check the error code associated with each partition in the fetch
> > response?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <lu...@gmail.com> wrote:
> >
> > > No. The simple consumer does receive some responses and can iterate the
> > > loop:
> > >
> > > for (MessageAndOffset messageAndOffset :
> > fetchResponse.messageSet(m_topic,
> > > m_partition)) {
> > >  //handle messages
> > > }
> > >
> > > but after that, the response still returns will byte[], I can see the
> > > content, but the iterator cannot iterate:
> > >
> > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > > m_partition).iterator();
> > > itr.hasNext()  is FALSE.
> > >
> > > No error messages found.
> > >
> > >
> > >
> > > On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Is that related to
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
> > > > ?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com>
> wrote:
> > > >
> > > > > It seems the reason is I enabled gzip compression.
> > > > >
> > > > > what the code would like to consume compressed messages?
> > > > >
> > > > > thanks.
> > > > >
> > > > >
> > > > > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am following the
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > > > > >
> > > > > > When I send KeyedMessage<String, String> with StringEncoder, I
> can
> > > get
> > > > > the
> > > > > > messages sent:
> > > > > >
> > > > > > for (MessageAndOffset messageAndOffset :
> > > > > fetchResponse.messageSet(m_topic,
> > > > > > m_partition)) {
> > > > > >  //handle messages
> > > > > > }
> > > > > >
> > > > > > But when I send KeyedMessage<byte[], byte[]> with
> DefaultEncoder, I
> > > > > cannot
> > > > > > get the messages:
> > > > > >
> > > > > > Iterator<MessageAndOffset> itr =
> fetchResponse.messageSet(m_topic,
> > > > > > m_partition).iterator();
> > > > > > itr.hasNext()  is FALSE.
> > > > > >
> > > > > > the test code is the same, what is causing this? What change
> needs
> > to
> > > > be
> > > > > > made?
> > > > > >
> > > > > > thanks.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Lu Xuechao <lu...@gmail.com>.
checked fetchResponse.hasError() but has no error.


On Fri, Nov 1, 2013 at 7:45 AM, Jun Rao <ju...@gmail.com> wrote:

> Did you check the error code associated with each partition in the fetch
> response?
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <lu...@gmail.com> wrote:
>
> > No. The simple consumer does receive some responses and can iterate the
> > loop:
> >
> > for (MessageAndOffset messageAndOffset :
> fetchResponse.messageSet(m_topic,
> > m_partition)) {
> >  //handle messages
> > }
> >
> > but after that, the response still returns will byte[], I can see the
> > content, but the iterator cannot iterate:
> >
> > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > m_partition).iterator();
> > itr.hasNext()  is FALSE.
> >
> > No error messages found.
> >
> >
> >
> > On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Is that related to
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
> > > ?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com> wrote:
> > >
> > > > It seems the reason is I enabled gzip compression.
> > > >
> > > > what the code would like to consume compressed messages?
> > > >
> > > > thanks.
> > > >
> > > >
> > > > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am following the
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > > > >
> > > > > When I send KeyedMessage<String, String> with StringEncoder, I can
> > get
> > > > the
> > > > > messages sent:
> > > > >
> > > > > for (MessageAndOffset messageAndOffset :
> > > > fetchResponse.messageSet(m_topic,
> > > > > m_partition)) {
> > > > >  //handle messages
> > > > > }
> > > > >
> > > > > But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I
> > > > cannot
> > > > > get the messages:
> > > > >
> > > > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > > > > m_partition).iterator();
> > > > > itr.hasNext()  is FALSE.
> > > > >
> > > > > the test code is the same, what is causing this? What change needs
> to
> > > be
> > > > > made?
> > > > >
> > > > > thanks.
> > > > >
> > > >
> > >
> >
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Jun Rao <ju...@gmail.com>.
Did you check the error code associated with each partition in the fetch
response?

Thanks,

Jun


On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <lu...@gmail.com> wrote:

> No. The simple consumer does receive some responses and can iterate the
> loop:
>
> for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(m_topic,
> m_partition)) {
>  //handle messages
> }
>
> but after that, the response still returns will byte[], I can see the
> content, but the iterator cannot iterate:
>
> Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> m_partition).iterator();
> itr.hasNext()  is FALSE.
>
> No error messages found.
>
>
>
> On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Is that related to
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
> > ?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com> wrote:
> >
> > > It seems the reason is I enabled gzip compression.
> > >
> > > what the code would like to consume compressed messages?
> > >
> > > thanks.
> > >
> > >
> > > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I am following the
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > > >
> > > > When I send KeyedMessage<String, String> with StringEncoder, I can
> get
> > > the
> > > > messages sent:
> > > >
> > > > for (MessageAndOffset messageAndOffset :
> > > fetchResponse.messageSet(m_topic,
> > > > m_partition)) {
> > > >  //handle messages
> > > > }
> > > >
> > > > But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I
> > > cannot
> > > > get the messages:
> > > >
> > > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > > > m_partition).iterator();
> > > > itr.hasNext()  is FALSE.
> > > >
> > > > the test code is the same, what is causing this? What change needs to
> > be
> > > > made?
> > > >
> > > > thanks.
> > > >
> > >
> >
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Lu Xuechao <lu...@gmail.com>.
I enabled gzip compression. Each topic has 10 partitions and each partition
is handled by 1 simple consumer thread. All consumers stop to iterate after
iterate first several responses. The responses still return with bytes, but
cannot iterate.


On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <lu...@gmail.com> wrote:

> No. The simple consumer does receive some responses and can iterate the
> loop:
>
>
> for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(m_topic,
> m_partition)) {
>  //handle messages
> }
>
> but after that, the response still returns will byte[], I can see the
> content, but the iterator cannot iterate:
>
>
> Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> m_partition).iterator();
>  itr.hasNext()  is FALSE.
>
> No error messages found.
>
>
>
> On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Is that related to
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
>> ?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com> wrote:
>>
>> > It seems the reason is I enabled gzip compression.
>> >
>> > what the code would like to consume compressed messages?
>> >
>> > thanks.
>> >
>> >
>> > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com> wrote:
>> >
>> > > Hi,
>> > >
>> > > I am following the
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>> > >
>> > > When I send KeyedMessage<String, String> with StringEncoder, I can get
>> > the
>> > > messages sent:
>> > >
>> > > for (MessageAndOffset messageAndOffset :
>> > fetchResponse.messageSet(m_topic,
>> > > m_partition)) {
>> > >  //handle messages
>> > > }
>> > >
>> > > But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I
>> > cannot
>> > > get the messages:
>> > >
>> > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
>> > > m_partition).iterator();
>> > > itr.hasNext()  is FALSE.
>> > >
>> > > the test code is the same, what is causing this? What change needs to
>> be
>> > > made?
>> > >
>> > > thanks.
>> > >
>> >
>>
>
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Lu Xuechao <lu...@gmail.com>.
No. The simple consumer does receive some responses and can iterate the
loop:

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(m_topic,
m_partition)) {
 //handle messages
}

but after that, the response still returns will byte[], I can see the
content, but the iterator cannot iterate:

Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
m_partition).iterator();
itr.hasNext()  is FALSE.

No error messages found.



On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <ju...@gmail.com> wrote:

> Is that related to
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
> ?
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com> wrote:
>
> > It seems the reason is I enabled gzip compression.
> >
> > what the code would like to consume compressed messages?
> >
> > thanks.
> >
> >
> > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I am following the
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > >
> > > When I send KeyedMessage<String, String> with StringEncoder, I can get
> > the
> > > messages sent:
> > >
> > > for (MessageAndOffset messageAndOffset :
> > fetchResponse.messageSet(m_topic,
> > > m_partition)) {
> > >  //handle messages
> > > }
> > >
> > > But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I
> > cannot
> > > get the messages:
> > >
> > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > > m_partition).iterator();
> > > itr.hasNext()  is FALSE.
> > >
> > > the test code is the same, what is causing this? What change needs to
> be
> > > made?
> > >
> > > thanks.
> > >
> >
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Jun Rao <ju...@gmail.com>.
Is that related to
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F?

Thanks,

Jun


On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com> wrote:

> It seems the reason is I enabled gzip compression.
>
> what the code would like to consume compressed messages?
>
> thanks.
>
>
> On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com> wrote:
>
> > Hi,
> >
> > I am following the
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> >
> > When I send KeyedMessage<String, String> with StringEncoder, I can get
> the
> > messages sent:
> >
> > for (MessageAndOffset messageAndOffset :
> fetchResponse.messageSet(m_topic,
> > m_partition)) {
> >  //handle messages
> > }
> >
> > But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I
> cannot
> > get the messages:
> >
> > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > m_partition).iterator();
> > itr.hasNext()  is FALSE.
> >
> > the test code is the same, what is causing this? What change needs to be
> > made?
> >
> > thanks.
> >
>

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Guozhang Wang <wa...@gmail.com>.
Xuechao,

The same code should also work with gzip compression on
KeyedMessage<byte[], byte[]>. Did you see any exceptions in the consumer
logs?

Guozhang


On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lu...@gmail.com> wrote:

> It seems the reason is I enabled gzip compression.
>
> what the code would like to consume compressed messages?
>
> thanks.
>
>
> On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com> wrote:
>
> > Hi,
> >
> > I am following the
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> >
> > When I send KeyedMessage<String, String> with StringEncoder, I can get
> the
> > messages sent:
> >
> > for (MessageAndOffset messageAndOffset :
> fetchResponse.messageSet(m_topic,
> > m_partition)) {
> >  //handle messages
> > }
> >
> > But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I
> cannot
> > get the messages:
> >
> > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > m_partition).iterator();
> > itr.hasNext()  is FALSE.
> >
> > the test code is the same, what is causing this? What change needs to be
> > made?
> >
> > thanks.
> >
>



-- 
-- Guozhang

Re: SimpleConsumer cannot read KeyedMessage.

Posted by Lu Xuechao <lu...@gmail.com>.
It seems the reason is I enabled gzip compression.

what the code would like to consume compressed messages?

thanks.


On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lu...@gmail.com> wrote:

> Hi,
>
> I am following the
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> When I send KeyedMessage<String, String> with StringEncoder, I can get the
> messages sent:
>
> for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(m_topic,
> m_partition)) {
>  //handle messages
> }
>
> But when I send KeyedMessage<byte[], byte[]> with DefaultEncoder, I cannot
> get the messages:
>
> Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> m_partition).iterator();
> itr.hasNext()  is FALSE.
>
> the test code is the same, what is causing this? What change needs to be
> made?
>
> thanks.
>