You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Anjani Gupta <an...@salesforce.com> on 2017/01/27 18:19:09 UTC

kafka_2.10-0.8.1 simple consumer retrieves junk data in the message

I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple
Consumer API. I notice that byte array for message retrieved has 26 junk
bytes appended at the beginning  of original message sent by producer. Any
idea what's going on here? This works fine with High level consumer.

This is how my code looks like -

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
        OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
OffsetFetchRequest(GROUP_ID,
                Collections.singletonList(topicAndPartition), (short) 0, 0,
CLIENT_ID));

        //Fetch messages from Kafka.
        FetchRequest req = new FetchRequestBuilder()
                .clientId(CLIENT_ID)
                .addFetch(topic, partition, readOffset, 1000)
                .build();
        FetchResponse fetchResponse = consumer.fetch(req);
        for (MessageAndOffset messageAndOffset :
fetchResponse.messageSet(topicName, partition)) {
            byte[] message = messageAndOffset.message().payload().array();

}

Here message has additional 26 bytes appended to beginning of array.


Thanks,
Anjani

Re: kafka_2.10-0.8.1 simple consumer retrieves junk data in the message

Posted by Anjani Gupta <an...@salesforce.com>.
We use following method to deserialize the message consumed using Simple
Consumer -

DatumReader<T> datumReader = new SpecificDatumReader<>(className);
ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
T object = datumReader.read(null, decoder);
IOUtils.closeQuietly(inputStream);


It does not seem to handle header bytes. When I remove those 26 bytes,

deserialization work fine. Please note, we are using Simple consumer
API, not high level consumer.


On Mon, Jan 30, 2017 at 10:57 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> What are the 26 additional bytes? That sounds like a header that a
> decoder/deserializer is handling with the high level consumer. What class
> are you using to deserialize the messages with the high level consumer?
>
> -Ewen
>
> On Fri, Jan 27, 2017 at 10:19 AM, Anjani Gupta <
> anjani.gupta@salesforce.com>
> wrote:
>
> > I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple
> > Consumer API. I notice that byte array for message retrieved has 26 junk
> > bytes appended at the beginning  of original message sent by producer.
> Any
> > idea what's going on here? This works fine with High level consumer.
> >
> > This is how my code looks like -
> >
> > TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> > partition);
> >         OffsetFetchResponse offsetFetchResponse =
> consumer.fetchOffsets(new
> > OffsetFetchRequest(GROUP_ID,
> >                 Collections.singletonList(topicAndPartition), (short) 0,
> > 0,
> > CLIENT_ID));
> >
> >         //Fetch messages from Kafka.
> >         FetchRequest req = new FetchRequestBuilder()
> >                 .clientId(CLIENT_ID)
> >                 .addFetch(topic, partition, readOffset, 1000)
> >                 .build();
> >         FetchResponse fetchResponse = consumer.fetch(req);
> >         for (MessageAndOffset messageAndOffset :
> > fetchResponse.messageSet(topicName, partition)) {
> >             byte[] message = messageAndOffset.message().
> payload().array();
> >
> > }
> >
> > Here message has additional 26 bytes appended to beginning of array.
> >
> >
> > Thanks,
> > Anjani
> >
>

Re: kafka_2.10-0.8.1 simple consumer retrieves junk data in the message

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
What are the 26 additional bytes? That sounds like a header that a
decoder/deserializer is handling with the high level consumer. What class
are you using to deserialize the messages with the high level consumer?

-Ewen

On Fri, Jan 27, 2017 at 10:19 AM, Anjani Gupta <an...@salesforce.com>
wrote:

> I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple
> Consumer API. I notice that byte array for message retrieved has 26 junk
> bytes appended at the beginning  of original message sent by producer. Any
> idea what's going on here? This works fine with High level consumer.
>
> This is how my code looks like -
>
> TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> partition);
>         OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
> OffsetFetchRequest(GROUP_ID,
>                 Collections.singletonList(topicAndPartition), (short) 0,
> 0,
> CLIENT_ID));
>
>         //Fetch messages from Kafka.
>         FetchRequest req = new FetchRequestBuilder()
>                 .clientId(CLIENT_ID)
>                 .addFetch(topic, partition, readOffset, 1000)
>                 .build();
>         FetchResponse fetchResponse = consumer.fetch(req);
>         for (MessageAndOffset messageAndOffset :
> fetchResponse.messageSet(topicName, partition)) {
>             byte[] message = messageAndOffset.message().payload().array();
>
> }
>
> Here message has additional 26 bytes appended to beginning of array.
>
>
> Thanks,
> Anjani
>