You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by prashant amar <am...@gmail.com> on 2013/07/17 06:56:41 UTC

Message Encoding in Kafka 0.8

I noticed that ProducerData has been removed in 0.8 branch

If I'd wish to send a complex message type (encoded message) ,

how would I do it in 0.8?


In 0.7.X (a snippet from Neha's example)

   val producer = new Producer[Message, MemberRecord](config);


    // send a single message
    val message = new MemberRecord(1, "John", "US")
    val producerData = new ProducerData[Message,
MemberRecord]("member-records", message)
    producer.send(producerData)

class MemberRecord(val memberId: Int, val name: String, val location: String) {
  override def toString = {
    "(" + memberId + "," + name + "," + location + ")"
  }
}

class MemberRecordEncoder extends Encoder[MemberRecord] {
  def toMessage(member: MemberRecord):Message = {
    val outputStream = new ByteArrayOutputStream()
    val dos = new DataOutputStream(outputStream)
    dos.writeInt(member.memberId)
    dos.writeUTF(member.name)
    dos.writeUTF(member.location)
    outputStream.flush

    new Message(outputStream.toByteArray)
  }

Re: Message Encoding in Kafka 0.8

Posted by Jun Rao <ju...@gmail.com>.
The following code indicates that you want to use "encode" as the key
associated with the message. Is that your intend? If so, you need to
specify StringEncoder as the "key.serializer.class". Otherwise, use the api
that doesn't specify the key.

    val producerData = new KeyedMessage[String,
MemberRecord]("topic-Encoded", "encode", message)

Thanks,

Jun


On Thu, Jul 18, 2013 at 9:19 PM, prashant amar <am...@gmail.com> wrote:

> Can anybody please help with this issue indicated below. I have not heard
> from anyone with a solution.
>
> Thanks in advance
> Amar
>
>
> On Tue, Jul 16, 2013 at 11:35 PM, prashant amar <am...@gmail.com>
> wrote:
>
> > Hello,
> >
> > Specified below is my code base where I am attempting to marshall a
> > complex type and receiving the follow error.
> > Am I missing anything here?
> >
> >  Sending Encoded Messages ..
> > [error] (run-main) java.lang.ClassCastException: java.lang.String cannot
> > be cast to com.test.groups.MemberRecord
> > java.lang.ClassCastException: java.lang.String cannot be cast to
> > com.test.groups.MemberRecord
> >
> > ~~~~~~~~~~~~~~~~~~~~~~~~~
> >
> > package com.test.groups
> >
> > import java.util._
> > import kafka.javaapi.producer.Producer
> > import kafka.producer.KeyedMessage
> > import kafka.producer.ProducerConfig
> > import kafka.serializer.Encoder
> > import kafka.message.Message
> > import java.io.ByteArrayOutputStream
> > import java.io.DataOutputStream
> > import kafka.serializer.Decoder
> > import java.io.ByteArrayInputStream
> > import java.io.DataInputStream
> > import kafka.utils.VerifiableProperties
> >
> >
> >
> > class MemberRecord(val memberId: Int, val name: String, val location:
> > String) {
> >   override def toString = {
> >     "(" + memberId + "," + name + "," + location + ")"
> >   }
> > }
> >
> > class MemberRecordEncoder(props: VerifiableProperties = null) extends
> > Encoder[MemberRecord] {
> >   def toBytes(member: MemberRecord): Array[Byte] = {
> >     val outputStream = new ByteArrayOutputStream()
> >     val dos = new DataOutputStream(outputStream)
> >     dos.writeInt(member.memberId)
> >     dos.writeUTF(member.name)
> >     dos.writeUTF(member.location)
> >     outputStream.flush
> >     outputStream.toByteArray
> >   }
> > }
> >
> > class MemberRecordDecoder(props: VerifiableProperties = null) extends
> > Decoder[MemberRecord] {
> >   def fromBytes(messageByte: Array[Byte]):MemberRecord = {
> >     val message = new Message(messageByte)
> >     val inputStream = new ByteArrayInputStream(message.payload.array,
> > message.payload.arrayOffset, message.payload.limit)
> >     val dataInputStream = new DataInputStream(inputStream)
> >     new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
> > dataInputStream.readUTF)
> >   }
> > }
> >
> >
> > object SimpleDataProducer {
> >   def main(args: Array[String]) {
> >     val events = 100
> >
> >     val eprops = new Properties
> >     eprops.put("metadata.broker.list", "localhost:9092")
> >     eprops.put("serializer.class",
> "com.test.groups.MemberRecordEncoder");
> >     eprops.put("request.required.acks", "1")
> >
> >
> >     val econfg = new ProducerConfig(eprops)
> >     val eproducer = new Producer[String, MemberRecord](econfg)
> >
> >     val dataP = new SimpleDataProducer
> >     println(" Sending Encoded Messages .. ")
> >     dataP.sendEncodedMessage(10,eproducer)
> >
> >     println(" Shutting down Producer ")
> >     eproducer.close
> >     println(" Successfully shut down Producer ")
> >   }
> > }
> >
> > class SimpleDataProducer {
> >
> >   val rnd = new Random
> >
> >   def sendEncodedMessage(nEvents: Int, producer: Producer[String,
> > MemberRecord]) {
> >     for (nEvents <- 0 to nEvents) {
> >     val message = new MemberRecord(rnd.nextInt(255), "John", "US")
> >     val producerData = new KeyedMessage[String,
> > MemberRecord]("topic-Encoded", "encode", message)
> >     producer.send(producerData)
> >     }
> >   }
> >
> > }
> >
> >
> >
>

Re: Message Encoding in Kafka 0.8

Posted by prashant amar <am...@gmail.com>.
Can anybody please help with this issue indicated below. I have not heard
from anyone with a solution.

Thanks in advance
Amar


On Tue, Jul 16, 2013 at 11:35 PM, prashant amar <am...@gmail.com> wrote:

> Hello,
>
> Specified below is my code base where I am attempting to marshall a
> complex type and receiving the follow error.
> Am I missing anything here?
>
>  Sending Encoded Messages ..
> [error] (run-main) java.lang.ClassCastException: java.lang.String cannot
> be cast to com.test.groups.MemberRecord
> java.lang.ClassCastException: java.lang.String cannot be cast to
> com.test.groups.MemberRecord
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~
>
> package com.test.groups
>
> import java.util._
> import kafka.javaapi.producer.Producer
> import kafka.producer.KeyedMessage
> import kafka.producer.ProducerConfig
> import kafka.serializer.Encoder
> import kafka.message.Message
> import java.io.ByteArrayOutputStream
> import java.io.DataOutputStream
> import kafka.serializer.Decoder
> import java.io.ByteArrayInputStream
> import java.io.DataInputStream
> import kafka.utils.VerifiableProperties
>
>
>
> class MemberRecord(val memberId: Int, val name: String, val location:
> String) {
>   override def toString = {
>     "(" + memberId + "," + name + "," + location + ")"
>   }
> }
>
> class MemberRecordEncoder(props: VerifiableProperties = null) extends
> Encoder[MemberRecord] {
>   def toBytes(member: MemberRecord): Array[Byte] = {
>     val outputStream = new ByteArrayOutputStream()
>     val dos = new DataOutputStream(outputStream)
>     dos.writeInt(member.memberId)
>     dos.writeUTF(member.name)
>     dos.writeUTF(member.location)
>     outputStream.flush
>     outputStream.toByteArray
>   }
> }
>
> class MemberRecordDecoder(props: VerifiableProperties = null) extends
> Decoder[MemberRecord] {
>   def fromBytes(messageByte: Array[Byte]):MemberRecord = {
>     val message = new Message(messageByte)
>     val inputStream = new ByteArrayInputStream(message.payload.array,
> message.payload.arrayOffset, message.payload.limit)
>     val dataInputStream = new DataInputStream(inputStream)
>     new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
> dataInputStream.readUTF)
>   }
> }
>
>
> object SimpleDataProducer {
>   def main(args: Array[String]) {
>     val events = 100
>
>     val eprops = new Properties
>     eprops.put("metadata.broker.list", "localhost:9092")
>     eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
>     eprops.put("request.required.acks", "1")
>
>
>     val econfg = new ProducerConfig(eprops)
>     val eproducer = new Producer[String, MemberRecord](econfg)
>
>     val dataP = new SimpleDataProducer
>     println(" Sending Encoded Messages .. ")
>     dataP.sendEncodedMessage(10,eproducer)
>
>     println(" Shutting down Producer ")
>     eproducer.close
>     println(" Successfully shut down Producer ")
>   }
> }
>
> class SimpleDataProducer {
>
>   val rnd = new Random
>
>   def sendEncodedMessage(nEvents: Int, producer: Producer[String,
> MemberRecord]) {
>     for (nEvents <- 0 to nEvents) {
>     val message = new MemberRecord(rnd.nextInt(255), "John", "US")
>     val producerData = new KeyedMessage[String,
> MemberRecord]("topic-Encoded", "encode", message)
>     producer.send(producerData)
>     }
>   }
>
> }
>
>
>

Re: Message Encoding in Kafka 0.8

Posted by prashant amar <am...@gmail.com>.
Can anybody please help with this issue indicated below. I have not heard
from anyone with a solution.

Thanks in advance
Amar


On Tue, Jul 16, 2013 at 11:35 PM, prashant amar <am...@gmail.com> wrote:

> Hello,
>
> Specified below is my code base where I am attempting to marshall a
> complex type and receiving the follow error.
> Am I missing anything here?
>
>  Sending Encoded Messages ..
> [error] (run-main) java.lang.ClassCastException: java.lang.String cannot
> be cast to com.test.groups.MemberRecord
> java.lang.ClassCastException: java.lang.String cannot be cast to
> com.test.groups.MemberRecord
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~
>
> package com.test.groups
>
> import java.util._
> import kafka.javaapi.producer.Producer
> import kafka.producer.KeyedMessage
> import kafka.producer.ProducerConfig
> import kafka.serializer.Encoder
> import kafka.message.Message
> import java.io.ByteArrayOutputStream
> import java.io.DataOutputStream
> import kafka.serializer.Decoder
> import java.io.ByteArrayInputStream
> import java.io.DataInputStream
> import kafka.utils.VerifiableProperties
>
>
>
> class MemberRecord(val memberId: Int, val name: String, val location:
> String) {
>   override def toString = {
>     "(" + memberId + "," + name + "," + location + ")"
>   }
> }
>
> class MemberRecordEncoder(props: VerifiableProperties = null) extends
> Encoder[MemberRecord] {
>   def toBytes(member: MemberRecord): Array[Byte] = {
>     val outputStream = new ByteArrayOutputStream()
>     val dos = new DataOutputStream(outputStream)
>     dos.writeInt(member.memberId)
>     dos.writeUTF(member.name)
>     dos.writeUTF(member.location)
>     outputStream.flush
>     outputStream.toByteArray
>   }
> }
>
> class MemberRecordDecoder(props: VerifiableProperties = null) extends
> Decoder[MemberRecord] {
>   def fromBytes(messageByte: Array[Byte]):MemberRecord = {
>     val message = new Message(messageByte)
>     val inputStream = new ByteArrayInputStream(message.payload.array,
> message.payload.arrayOffset, message.payload.limit)
>     val dataInputStream = new DataInputStream(inputStream)
>     new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
> dataInputStream.readUTF)
>   }
> }
>
>
> object SimpleDataProducer {
>   def main(args: Array[String]) {
>     val events = 100
>
>     val eprops = new Properties
>     eprops.put("metadata.broker.list", "localhost:9092")
>     eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
>     eprops.put("request.required.acks", "1")
>
>
>     val econfg = new ProducerConfig(eprops)
>     val eproducer = new Producer[String, MemberRecord](econfg)
>
>     val dataP = new SimpleDataProducer
>     println(" Sending Encoded Messages .. ")
>     dataP.sendEncodedMessage(10,eproducer)
>
>     println(" Shutting down Producer ")
>     eproducer.close
>     println(" Successfully shut down Producer ")
>   }
> }
>
> class SimpleDataProducer {
>
>   val rnd = new Random
>
>   def sendEncodedMessage(nEvents: Int, producer: Producer[String,
> MemberRecord]) {
>     for (nEvents <- 0 to nEvents) {
>     val message = new MemberRecord(rnd.nextInt(255), "John", "US")
>     val producerData = new KeyedMessage[String,
> MemberRecord]("topic-Encoded", "encode", message)
>     producer.send(producerData)
>     }
>   }
>
> }
>
>
>

Re: Message Encoding in Kafka 0.8

Posted by prashant amar <am...@gmail.com>.
Hello,

Specified below is my code base where I am attempting to marshall a complex
type and receiving the follow error.
Am I missing anything here?

 Sending Encoded Messages ..
[error] (run-main) java.lang.ClassCastException: java.lang.String cannot be
cast to com.test.groups.MemberRecord
java.lang.ClassCastException: java.lang.String cannot be cast to
com.test.groups.MemberRecord

~~~~~~~~~~~~~~~~~~~~~~~~~

package com.test.groups

import java.util._
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import kafka.serializer.Encoder
import kafka.message.Message
import java.io.ByteArrayOutputStream
import java.io.DataOutputStream
import kafka.serializer.Decoder
import java.io.ByteArrayInputStream
import java.io.DataInputStream
import kafka.utils.VerifiableProperties



class MemberRecord(val memberId: Int, val name: String, val location:
String) {
  override def toString = {
    "(" + memberId + "," + name + "," + location + ")"
  }
}

class MemberRecordEncoder(props: VerifiableProperties = null) extends
Encoder[MemberRecord] {
  def toBytes(member: MemberRecord): Array[Byte] = {
    val outputStream = new ByteArrayOutputStream()
    val dos = new DataOutputStream(outputStream)
    dos.writeInt(member.memberId)
    dos.writeUTF(member.name)
    dos.writeUTF(member.location)
    outputStream.flush
    outputStream.toByteArray
  }
}

class MemberRecordDecoder(props: VerifiableProperties = null) extends
Decoder[MemberRecord] {
  def fromBytes(messageByte: Array[Byte]):MemberRecord = {
    val message = new Message(messageByte)
    val inputStream = new ByteArrayInputStream(message.payload.array,
message.payload.arrayOffset, message.payload.limit)
    val dataInputStream = new DataInputStream(inputStream)
    new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
dataInputStream.readUTF)
  }
}


object SimpleDataProducer {
  def main(args: Array[String]) {
    val events = 100

    val eprops = new Properties
    eprops.put("metadata.broker.list", "localhost:9092")
    eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
    eprops.put("request.required.acks", "1")


    val econfg = new ProducerConfig(eprops)
    val eproducer = new Producer[String, MemberRecord](econfg)

    val dataP = new SimpleDataProducer
    println(" Sending Encoded Messages .. ")
    dataP.sendEncodedMessage(10,eproducer)

    println(" Shutting down Producer ")
    eproducer.close
    println(" Successfully shut down Producer ")
  }
}

class SimpleDataProducer {

  val rnd = new Random

  def sendEncodedMessage(nEvents: Int, producer: Producer[String,
MemberRecord]) {
    for (nEvents <- 0 to nEvents) {
    val message = new MemberRecord(rnd.nextInt(255), "John", "US")
    val producerData = new KeyedMessage[String,
MemberRecord]("topic-Encoded", "encode", message)
    producer.send(producerData)
    }
  }

}

Re: Message Encoding in Kafka 0.8

Posted by prashant amar <am...@gmail.com>.
Hello,

Specified below is my code base where I am attempting to marshall a complex
type and receiving the follow error.
Am I missing anything here?

 Sending Encoded Messages ..
[error] (run-main) java.lang.ClassCastException: java.lang.String cannot be
cast to com.test.groups.MemberRecord
java.lang.ClassCastException: java.lang.String cannot be cast to
com.test.groups.MemberRecord

~~~~~~~~~~~~~~~~~~~~~~~~~

package com.test.groups

import java.util._
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import kafka.serializer.Encoder
import kafka.message.Message
import java.io.ByteArrayOutputStream
import java.io.DataOutputStream
import kafka.serializer.Decoder
import java.io.ByteArrayInputStream
import java.io.DataInputStream
import kafka.utils.VerifiableProperties



class MemberRecord(val memberId: Int, val name: String, val location:
String) {
  override def toString = {
    "(" + memberId + "," + name + "," + location + ")"
  }
}

class MemberRecordEncoder(props: VerifiableProperties = null) extends
Encoder[MemberRecord] {
  def toBytes(member: MemberRecord): Array[Byte] = {
    val outputStream = new ByteArrayOutputStream()
    val dos = new DataOutputStream(outputStream)
    dos.writeInt(member.memberId)
    dos.writeUTF(member.name)
    dos.writeUTF(member.location)
    outputStream.flush
    outputStream.toByteArray
  }
}

class MemberRecordDecoder(props: VerifiableProperties = null) extends
Decoder[MemberRecord] {
  def fromBytes(messageByte: Array[Byte]):MemberRecord = {
    val message = new Message(messageByte)
    val inputStream = new ByteArrayInputStream(message.payload.array,
message.payload.arrayOffset, message.payload.limit)
    val dataInputStream = new DataInputStream(inputStream)
    new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
dataInputStream.readUTF)
  }
}


object SimpleDataProducer {
  def main(args: Array[String]) {
    val events = 100

    val eprops = new Properties
    eprops.put("metadata.broker.list", "localhost:9092")
    eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
    eprops.put("request.required.acks", "1")


    val econfg = new ProducerConfig(eprops)
    val eproducer = new Producer[String, MemberRecord](econfg)

    val dataP = new SimpleDataProducer
    println(" Sending Encoded Messages .. ")
    dataP.sendEncodedMessage(10,eproducer)

    println(" Shutting down Producer ")
    eproducer.close
    println(" Successfully shut down Producer ")
  }
}

class SimpleDataProducer {

  val rnd = new Random

  def sendEncodedMessage(nEvents: Int, producer: Producer[String,
MemberRecord]) {
    for (nEvents <- 0 to nEvents) {
    val message = new MemberRecord(rnd.nextInt(255), "John", "US")
    val producerData = new KeyedMessage[String,
MemberRecord]("topic-Encoded", "encode", message)
    producer.send(producerData)
    }
  }

}