You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ratha v <vi...@gmail.com> on 2016/03/22 03:38:08 UTC

How to publish/consume java bean objects to Kafka 2.11 version?

Hi all;
Im a newbie to kafka. Im trying to publish my java object to kafka topic an
try to consume.
I see there are some API changes in the latest version of the kafka. can
anybody point some samples for how to publish and consume java objects? I
have written my own data serializer, but could not publish that to a topic.
Any guide/samples would be appreciate..


*Customserilaizer*



import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;


import kafka.serializer.Decoder;
import kafka.serializer.Encoder;

public class CustomSerializer implements Encoder<FileObj>,
Decoder< FileObj > {

@Override
public byte[] toBytes(FileObj file) {
try {

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] rawFileBytes;
try {
out = new ObjectOutputStream(bos);
out.writeObject(file);
rawFileBytes = bos.toByteArray();

} finally {
try {
if (out != null) {
out.close();
bos.close();
}
} catch (Exception ex) {
ex.getLocalizedMessage();
}

}
return rawFileBytes;
} catch (IOException e) {
e.printStackTrace();
return null;
}

}

@Override
public FileObj fromBytes(byte[] fileContent) {
ByteArrayInputStream bis = new ByteArrayInputStream(fileContent);
ObjectInput in = null;
Object obj = null;
try {
in = new ObjectInputStream(bis);
obj = in.readObject();

} catch (IOException e) {

e.printStackTrace();
} catch (ClassNotFoundException e) {

e.printStackTrace();
} finally {
try {
bis.close();
if (in != null) {
in.close();
}
} catch (IOException ex) {
// ignore
}

}
return (FileObj) obj;
}

}



-Ratha

http://vvratha.blogspot.com/

Re: How to publish/consume java bean objects to Kafka 2.11 version?

Posted by Ratha v <vi...@gmail.com>.
Thanks Gerad for the helpful hint..I was looking for samples on how to
publish/consume java objects and could not find any concrete samples. Some
references points old APIs I guess.

Im using Kafka 2.11 version.I have written my own serialiser( I thought I
have to, based on guides ) and don't know which methods I have to use to
publish..

Now I can somehow publish and consume objects..
I'll look into avro schema registry too.

Regards,
Ratha.


On 22 March 2016 at 18:25, Gerard Klijs <ge...@dizzit.com> wrote:

> If I'm reading right, your question is more about how to successfully
> de(serialise) java object? You might want to take a look at the confluent
> avro schema registry. Using avro schema's you can easily store messages in
> a java object created by the schema. This way the messages will also be a
> lot smaller, witch helps performance. And you don't have to maintain you
> own de(serialiser).
>
> On Tue, Mar 22, 2016 at 3:38 AM Ratha v <vi...@gmail.com> wrote:
>
> > Hi all;
> > Im a newbie to kafka. Im trying to publish my java object to kafka topic
> an
> > try to consume.
> > I see there are some API changes in the latest version of the kafka. can
> > anybody point some samples for how to publish and consume java objects? I
> > have written my own data serializer, but could not publish that to a
> topic.
> > Any guide/samples would be appreciate..
> >
> >
> > *Customserilaizer*
> >
> >
> >
> > import java.io.ByteArrayInputStream;
> > import java.io.ByteArrayOutputStream;
> > import java.io.IOException;
> > import java.io.ObjectInput;
> > import java.io.ObjectInputStream;
> > import java.io.ObjectOutput;
> > import java.io.ObjectOutputStream;
> >
> >
> > import kafka.serializer.Decoder;
> > import kafka.serializer.Encoder;
> >
> > public class CustomSerializer implements Encoder<FileObj>,
> > Decoder< FileObj > {
> >
> > @Override
> > public byte[] toBytes(FileObj file) {
> > try {
> >
> > ByteArrayOutputStream bos = new ByteArrayOutputStream();
> > ObjectOutput out = null;
> > byte[] rawFileBytes;
> > try {
> > out = new ObjectOutputStream(bos);
> > out.writeObject(file);
> > rawFileBytes = bos.toByteArray();
> >
> > } finally {
> > try {
> > if (out != null) {
> > out.close();
> > bos.close();
> > }
> > } catch (Exception ex) {
> > ex.getLocalizedMessage();
> > }
> >
> > }
> > return rawFileBytes;
> > } catch (IOException e) {
> > e.printStackTrace();
> > return null;
> > }
> >
> > }
> >
> > @Override
> > public FileObj fromBytes(byte[] fileContent) {
> > ByteArrayInputStream bis = new ByteArrayInputStream(fileContent);
> > ObjectInput in = null;
> > Object obj = null;
> > try {
> > in = new ObjectInputStream(bis);
> > obj = in.readObject();
> >
> > } catch (IOException e) {
> >
> > e.printStackTrace();
> > } catch (ClassNotFoundException e) {
> >
> > e.printStackTrace();
> > } finally {
> > try {
> > bis.close();
> > if (in != null) {
> > in.close();
> > }
> > } catch (IOException ex) {
> > // ignore
> > }
> >
> > }
> > return (FileObj) obj;
> > }
> >
> > }
> >
> >
> >
> > -Ratha
> >
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/

Re: How to publish/consume java bean objects to Kafka 2.11 version?

Posted by Gerard Klijs <ge...@dizzit.com>.
If I'm reading right, your question is more about how to successfully
de(serialise) java object? You might want to take a look at the confluent
avro schema registry. Using avro schema's you can easily store messages in
a java object created by the schema. This way the messages will also be a
lot smaller, witch helps performance. And you don't have to maintain you
own de(serialiser).

On Tue, Mar 22, 2016 at 3:38 AM Ratha v <vi...@gmail.com> wrote:

> Hi all;
> Im a newbie to kafka. Im trying to publish my java object to kafka topic an
> try to consume.
> I see there are some API changes in the latest version of the kafka. can
> anybody point some samples for how to publish and consume java objects? I
> have written my own data serializer, but could not publish that to a topic.
> Any guide/samples would be appreciate..
>
>
> *Customserilaizer*
>
>
>
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.io.IOException;
> import java.io.ObjectInput;
> import java.io.ObjectInputStream;
> import java.io.ObjectOutput;
> import java.io.ObjectOutputStream;
>
>
> import kafka.serializer.Decoder;
> import kafka.serializer.Encoder;
>
> public class CustomSerializer implements Encoder<FileObj>,
> Decoder< FileObj > {
>
> @Override
> public byte[] toBytes(FileObj file) {
> try {
>
> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> ObjectOutput out = null;
> byte[] rawFileBytes;
> try {
> out = new ObjectOutputStream(bos);
> out.writeObject(file);
> rawFileBytes = bos.toByteArray();
>
> } finally {
> try {
> if (out != null) {
> out.close();
> bos.close();
> }
> } catch (Exception ex) {
> ex.getLocalizedMessage();
> }
>
> }
> return rawFileBytes;
> } catch (IOException e) {
> e.printStackTrace();
> return null;
> }
>
> }
>
> @Override
> public FileObj fromBytes(byte[] fileContent) {
> ByteArrayInputStream bis = new ByteArrayInputStream(fileContent);
> ObjectInput in = null;
> Object obj = null;
> try {
> in = new ObjectInputStream(bis);
> obj = in.readObject();
>
> } catch (IOException e) {
>
> e.printStackTrace();
> } catch (ClassNotFoundException e) {
>
> e.printStackTrace();
> } finally {
> try {
> bis.close();
> if (in != null) {
> in.close();
> }
> } catch (IOException ex) {
> // ignore
> }
>
> }
> return (FileObj) obj;
> }
>
> }
>
>
>
> -Ratha
>
> http://vvratha.blogspot.com/
>