You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shamik Bandopadhyay <sh...@gmail.com> on 2016/09/06 23:21:06 UTC

Problem consuming message using custom serializer

Hi,

  I'm trying to send java object using kryo object serializer . The
producer is able to send the payload to the queue, but I'm having issues
reading the data in consumer. I'm using consumer group using KafkaStream.
The consumer code is based out of the example in kafka documentation.
Here's the consumer code and the corresponding error:

public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber));
threadNumber++;
}
}

Inside ConsumerGroupSerializerObject's run method,

private KafkaStream m_stream;

public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
ByteArrayInputStream in = null;
ObjectInputStream is = null;
while (it.hasNext()){
try{
in = new ByteArrayInputStream(it.next().message());
is = new ObjectInputStream(in);
TextAnalysisRequest req = (TextAnalysisRequest)is.readObject();
}catch(ClassNotFoundException ex){
ex.printStackTrace();
}catch(IOException ex){
ex.printStackTrace();
}finally{
try{
in.close();
is.close();
}catch(IOException ex){
ex.printStackTrace();
}
}
}
}

I'm getting exception at the following line:

is = new ObjectInputStream(in);

java.io.StreamCorruptedException: invalid stream header: 01746573
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at
com.test.kafka.consumer.ConsumerGroupSerializerObject.run(ConsumerGroupSerializerObject.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Here's the property:

Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", KryoReadingSerializer.class.getName());

I'm new to kafka, so not entirely sure if this is right approach of
consuming message using custom serializer. Moreover, I'm using KafkaStream
, can it be an issue as well ?

Any pointers will be appreciated.

Thanks,
Shamik

Re: Problem consuming message using custom serializer

Posted by Shamik Bandopadhyay <sh...@gmail.com>.
Anyone ?

On Tue, Sep 6, 2016 at 4:21 PM, Shamik Bandopadhyay <sh...@gmail.com>
wrote:

> Hi,
>
>   I'm trying to send java object using kryo object serializer . The
> producer is able to send the payload to the queue, but I'm having issues
> reading the data in consumer. I'm using consumer group using KafkaStream.
> The consumer code is based out of the example in kafka documentation.
> Here's the consumer code and the corresponding error:
>
> public void run(int a_numThreads) {
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(a_numThreads));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
> executor = Executors.newFixedThreadPool(a_numThreads);
> int threadNumber = 0;
> for (final KafkaStream stream : streams) {
> executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber));
> threadNumber++;
> }
> }
>
> Inside ConsumerGroupSerializerObject's run method,
>
> private KafkaStream m_stream;
>
> public void run() {
> ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> ByteArrayInputStream in = null;
> ObjectInputStream is = null;
> while (it.hasNext()){
> try{
> in = new ByteArrayInputStream(it.next().message());
> is = new ObjectInputStream(in);
> TextAnalysisRequest req = (TextAnalysisRequest)is.readObject();
> }catch(ClassNotFoundException ex){
> ex.printStackTrace();
> }catch(IOException ex){
> ex.printStackTrace();
> }finally{
> try{
> in.close();
> is.close();
> }catch(IOException ex){
> ex.printStackTrace();
> }
> }
> }
> }
>
> I'm getting exception at the following line:
>
> is = new ObjectInputStream(in);
>
> java.io.StreamCorruptedException: invalid stream header: 01746573
> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> at com.test.kafka.consumer.ConsumerGroupSerializerObject.run(
> ConsumerGroupSerializerObject.java:43)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Here's the property:
>
> Properties props = new Properties();
> props.put("zookeeper.connect", a_zookeeper);
> props.put("group.id", a_groupId);
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "smallest");
> props.put("key.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer");
> props.put("value.deserializer", KryoReadingSerializer.class.getName());
>
> I'm new to kafka, so not entirely sure if this is right approach of
> consuming message using custom serializer. Moreover, I'm using KafkaStream
> , can it be an issue as well ?
>
> Any pointers will be appreciated.
>
> Thanks,
> Shamik
>