You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Patricio Echagüe <pa...@gmail.com> on 2012/04/04 00:22:26 UTC

Help with encoding issue.

Hi, I noticed that String Serializer somehow doesn't do well encoding
special characters such as "ü".

I tried to create a ByteBufferEncoder this way:

import java.nio.ByteBuffer;

import kafka.message.Message;

import kafka.serializer.Encoder;


public class ByteBufferEncoder implements Encoder<ByteBuffer> {

   public Message toMessage(ByteBuffer buffer) {

     return new Message(buffer);

   }

}


but I get this exception [1]


Could you guys please advice on how to fix my encoding issue?

Thanks


[1]

Exception in thread "main" java.lang.RuntimeException: Invalid magic byte 34

at kafka.message.Message.compressionCodec(Message.scala:144)

at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
ByteBufferMessageSet.scala:112)

at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
ByteBufferMessageSet.scala:138)

at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
ByteBufferMessageSet.scala:82)

at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)

at scala.collection.Iterator$class.foreach(Iterator.scala:631)

at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)

at kafka.message.MessageSet.foreach(MessageSet.scala:87)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
SyncProducer.scala:139)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
ProducerPool.scala:116)

at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)

at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)

at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:57)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)

at kafka.producer.ProducerPool.send(ProducerPool.scala:102)

at kafka.producer.Producer.zkSend(Producer.scala:143)

at kafka.producer.Producer.send(Producer.scala:105)

at kafka.javaapi.producer.Producer.send(Producer.scala:104)

at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)

Re: Help with encoding issue.

Posted by Jun Rao <ju...@gmail.com>.
I think the issue is that you used the wrong API of Message. A client
should only use this(bytes: Array[Byte]), instead of this(buffer:
ByteBuffer). The latter is for internal use only and includes kafka
metadata in it. We probably should restrict the visibility of the latter
api.

Thanks,

Jun

On Tue, Apr 3, 2012 at 3:22 PM, Patricio Echagüe <pa...@gmail.com>wrote:

> Hi, I noticed that String Serializer somehow doesn't do well encoding
> special characters such as "ü".
>
> I tried to create a ByteBufferEncoder this way:
>
> import java.nio.ByteBuffer;
>
> import kafka.message.Message;
>
> import kafka.serializer.Encoder;
>
>
> public class ByteBufferEncoder implements Encoder<ByteBuffer> {
>
>   public Message toMessage(ByteBuffer buffer) {
>
>     return new Message(buffer);
>
>   }
>
> }
>
>
> but I get this exception [1]
>
>
> Could you guys please advice on how to fix my encoding issue?
>
> Thanks
>
>
> [1]
>
> Exception in thread "main" java.lang.RuntimeException: Invalid magic byte
> 34
>
> at kafka.message.Message.compressionCodec(Message.scala:144)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
> ByteBufferMessageSet.scala:112)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> ByteBufferMessageSet.scala:138)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> ByteBufferMessageSet.scala:82)
>
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>
> at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
> SyncProducer.scala:139)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
> ProducerPool.scala:116)
>
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:57)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>
> at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>
> at kafka.producer.Producer.zkSend(Producer.scala:143)
>
> at kafka.producer.Producer.send(Producer.scala:105)
>
> at kafka.javaapi.producer.Producer.send(Producer.scala:104)
>
> at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
>

Re: Help with encoding issue.

Posted by Eric Tschetter <ec...@gmail.com>.
If it's just an encoding issue like Jay says, you might also consider
setting the JVM parameter

-Dfile.encoding=UTF-8

That will set the default encoding for the whole JVM process to be
UTF-8 which is probably what you want.  It is, of course, better to
have the Charset specified explicitly, but this might be a good
short-term work-around.

--Eric


On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <ja...@gmail.com> wrote:
> This is our bug, we were taking the system default encoding (d'oh). I
> have a patch for it I was adding to 0.8, we can probably backport it
> for older releases too pretty easily.
>
> -Jay
>
> 2012/4/3 Patricio Echagüe <pa...@gmail.com>:
>> Hi, I noticed that String Serializer somehow doesn't do well encoding
>> special characters such as "ü".
>>
>> I tried to create a ByteBufferEncoder this way:
>>
>> import java.nio.ByteBuffer;
>>
>> import kafka.message.Message;
>>
>> import kafka.serializer.Encoder;
>>
>>
>> public class ByteBufferEncoder implements Encoder<ByteBuffer> {
>>
>>   public Message toMessage(ByteBuffer buffer) {
>>
>>     return new Message(buffer);
>>
>>   }
>>
>> }
>>
>>
>> but I get this exception [1]
>>
>>
>> Could you guys please advice on how to fix my encoding issue?
>>
>> Thanks
>>
>>
>> [1]
>>
>> Exception in thread "main" java.lang.RuntimeException: Invalid magic byte 34
>>
>> at kafka.message.Message.compressionCodec(Message.scala:144)
>>
>> at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
>> ByteBufferMessageSet.scala:112)
>>
>> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
>> ByteBufferMessageSet.scala:138)
>>
>> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
>> ByteBufferMessageSet.scala:82)
>>
>> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>
>> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>
>> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>
>> at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>
>> at
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
>> SyncProducer.scala:139)
>>
>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>
>> at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
>> ProducerPool.scala:116)
>>
>> at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>>
>> at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>>
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:57)
>>
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>>
>> at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>>
>> at kafka.producer.Producer.zkSend(Producer.scala:143)
>>
>> at kafka.producer.Producer.send(Producer.scala:105)
>>
>> at kafka.javaapi.producer.Producer.send(Producer.scala:104)
>>
>> at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)

Re: Help with encoding issue.

Posted by Jay Kreps <ja...@gmail.com>.
Yeah I think I jumped to conclusions. The issue I was referring to was
just assuming the detault encoding, which would not cause the issue
you described.

-Jay

2012/4/3 Patricio Echagüe <pa...@gmail.com>:
> Interesting. What I can't explain though is why it works just fine when
> printing the string this way:
>
>      for(Message message: stream) {
>
>        ByteBuffer bb = message.payload().duplicate();
>
>        ByteBuffer bb2 = message.payload().duplicate();
>
>        byte[] bytes = new byte[bb2.remaining()];
>
>        bb2.get(bytes);
>
>        System.out.println("Message received string: " + new String(bytes));
>
>          consumerConnector.commitOffsets();
>
>    }
> do you have a link to your patch Jay ?
> On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <ja...@gmail.com> wrote:
>
>> This is our bug, we were taking the system default encoding (d'oh). I
>> have a patch for it I was adding to 0.8, we can probably backport it
>> for older releases too pretty easily.
>>
>> -Jay
>>
>> 2012/4/3 Patricio Echagüe <pa...@gmail.com>:
>> > Hi, I noticed that String Serializer somehow doesn't do well encoding
>> > special characters such as "ü".
>> >
>> > I tried to create a ByteBufferEncoder this way:
>> >
>> > import java.nio.ByteBuffer;
>> >
>> > import kafka.message.Message;
>> >
>> > import kafka.serializer.Encoder;
>> >
>> >
>> > public class ByteBufferEncoder implements Encoder<ByteBuffer> {
>> >
>> >   public Message toMessage(ByteBuffer buffer) {
>> >
>> >     return new Message(buffer);
>> >
>> >   }
>> >
>> > }
>> >
>> >
>> > but I get this exception [1]
>> >
>> >
>> > Could you guys please advice on how to fix my encoding issue?
>> >
>> > Thanks
>> >
>> >
>> > [1]
>> >
>> > Exception in thread "main" java.lang.RuntimeException: Invalid magic
>> byte 34
>> >
>> > at kafka.message.Message.compressionCodec(Message.scala:144)
>> >
>> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
>> > ByteBufferMessageSet.scala:112)
>> >
>> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
>> > ByteBufferMessageSet.scala:138)
>> >
>> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
>> > ByteBufferMessageSet.scala:82)
>> >
>> > at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> >
>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> >
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>> >
>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>> >
>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>> >
>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>> >
>> > at
>> >
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
>> > SyncProducer.scala:139)
>> >
>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>> >
>> > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
>> > ProducerPool.scala:116)
>> >
>> > at
>> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>> >
>> > at
>> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>> >
>> > at scala.collection.mutable.ResizableArray$class.foreach(
>> > ResizableArray.scala:57)
>> >
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>> >
>> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>> >
>> > at kafka.producer.Producer.zkSend(Producer.scala:143)
>> >
>> > at kafka.producer.Producer.send(Producer.scala:105)
>> >
>> > at kafka.javaapi.producer.Producer.send(Producer.scala:104)
>> >
>> > at
>> com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
>>

Re: Help with encoding issue.

Posted by Patricio Echagüe <pa...@gmail.com>.
Interesting. What I can't explain though is why it works just fine when
printing the string this way:

      for(Message message: stream) {

        ByteBuffer bb = message.payload().duplicate();

        ByteBuffer bb2 = message.payload().duplicate();

        byte[] bytes = new byte[bb2.remaining()];

        bb2.get(bytes);

        System.out.println("Message received string: " + new String(bytes));

          consumerConnector.commitOffsets();

    }
do you have a link to your patch Jay ?
On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <ja...@gmail.com> wrote:

> This is our bug, we were taking the system default encoding (d'oh). I
> have a patch for it I was adding to 0.8, we can probably backport it
> for older releases too pretty easily.
>
> -Jay
>
> 2012/4/3 Patricio Echagüe <pa...@gmail.com>:
> > Hi, I noticed that String Serializer somehow doesn't do well encoding
> > special characters such as "ü".
> >
> > I tried to create a ByteBufferEncoder this way:
> >
> > import java.nio.ByteBuffer;
> >
> > import kafka.message.Message;
> >
> > import kafka.serializer.Encoder;
> >
> >
> > public class ByteBufferEncoder implements Encoder<ByteBuffer> {
> >
> >   public Message toMessage(ByteBuffer buffer) {
> >
> >     return new Message(buffer);
> >
> >   }
> >
> > }
> >
> >
> > but I get this exception [1]
> >
> >
> > Could you guys please advice on how to fix my encoding issue?
> >
> > Thanks
> >
> >
> > [1]
> >
> > Exception in thread "main" java.lang.RuntimeException: Invalid magic
> byte 34
> >
> > at kafka.message.Message.compressionCodec(Message.scala:144)
> >
> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
> > ByteBufferMessageSet.scala:112)
> >
> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> > ByteBufferMessageSet.scala:138)
> >
> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> > ByteBufferMessageSet.scala:82)
> >
> > at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >
> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >
> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >
> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> >
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> >
> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> >
> > at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
> > SyncProducer.scala:139)
> >
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >
> > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
> > ProducerPool.scala:116)
> >
> > at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> >
> > at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> >
> > at scala.collection.mutable.ResizableArray$class.foreach(
> > ResizableArray.scala:57)
> >
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> >
> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
> >
> > at kafka.producer.Producer.zkSend(Producer.scala:143)
> >
> > at kafka.producer.Producer.send(Producer.scala:105)
> >
> > at kafka.javaapi.producer.Producer.send(Producer.scala:104)
> >
> > at
> com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
>

Re: Help with encoding issue.

Posted by Jay Kreps <ja...@gmail.com>.
This is our bug, we were taking the system default encoding (d'oh). I
have a patch for it I was adding to 0.8, we can probably backport it
for older releases too pretty easily.

-Jay

2012/4/3 Patricio Echagüe <pa...@gmail.com>:
> Hi, I noticed that String Serializer somehow doesn't do well encoding
> special characters such as "ü".
>
> I tried to create a ByteBufferEncoder this way:
>
> import java.nio.ByteBuffer;
>
> import kafka.message.Message;
>
> import kafka.serializer.Encoder;
>
>
> public class ByteBufferEncoder implements Encoder<ByteBuffer> {
>
>   public Message toMessage(ByteBuffer buffer) {
>
>     return new Message(buffer);
>
>   }
>
> }
>
>
> but I get this exception [1]
>
>
> Could you guys please advice on how to fix my encoding issue?
>
> Thanks
>
>
> [1]
>
> Exception in thread "main" java.lang.RuntimeException: Invalid magic byte 34
>
> at kafka.message.Message.compressionCodec(Message.scala:144)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
> ByteBufferMessageSet.scala:112)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> ByteBufferMessageSet.scala:138)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> ByteBufferMessageSet.scala:82)
>
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>
> at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
> SyncProducer.scala:139)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
> ProducerPool.scala:116)
>
> at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>
> at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:57)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>
> at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>
> at kafka.producer.Producer.zkSend(Producer.scala:143)
>
> at kafka.producer.Producer.send(Producer.scala:105)
>
> at kafka.javaapi.producer.Producer.send(Producer.scala:104)
>
> at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)