You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Haoming Zhang <ha...@outlook.com> on 2014/11/21 00:57:57 UTC

Partition Key Cannot be Send Out by Producer

Hi all,

I'm a beginner of Kafka, currently I'm stuck by how to send out a KeyedMessage by producer. I would like to design a partition
function to route the message based on the key, but the producer cannot send the KeyedMessage and I got this exception:
java.lang.ClassCastException: [B cannot be cast to java.lang.String

What I tried is hardcode a partition key ( I tried String and Integer, currently it is Integer ), then convert the partition key to Byte Array:
          val converter = new DataTypeConvertion
          val hardKey = 2
          val partkey = converter.intToByteArray(hardKey)

 Then create a KeyedMessage by the following function:

  private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): KeyedMessage[Key, Val] = {
    val t = topic.get
    require(!t.isEmpty, "Topic must not be empty")
    key match {
      case Some(key) => new KeyedMessage(t, key, value)
      case _ => new KeyedMessage(t, value)
    }
  }

Then try to send the KeyedMessage by a Kafka producer:

  def send(key: Key, value: Val, topic: Option[String] = None) {
    val msg = toMessage(value, Option(key), topic)
    print(msg + "\n")
    print("msg.key" + msg.key + "\n")
    print("msg.message" + msg.message + "\n")
    print("msg.partKey" + msg.partKey + "\n")
    print("msg.topic" + msg.topic + "\n")
    try {
      p.send(msg) //P is an instance of producer, exception happens in this line
    } catch {
      case e: Exception =>
        e.printStackTrace()
        System.exit(1)
    }
  }

As you can see, I added many print statement in the above function, and the following is the output of above function:
KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
msg.key: [B@7ad40950
msg.message: [B@7ce18764
msg.partKey: [B@7ad40950
msg.topic: testingInput

The key of KeyedMessage is displayed as [B@7ad40950 , I think it is a memory address and the exception (java.lang.ClassCastException: [B cannot be cast to java.lang.String) happens when "send" function try to convert the Byte Array to String.

Am I wrong on creating a key in Byte Array type?
Some examples of how to use KeyedMessage will be great!

Regards,
Haoming

 		 	   		  

RE: Partition Key Cannot be Send Out by Producer

Posted by Haoming Zhang <ha...@outlook.com>.
Hi Joe,

I just tried, it works! Now I should to think how to design the partition function.

Thanks!
Haoming

> Date: Thu, 20 Nov 2014 20:44:29 -0500
> Subject: Re: Partition Key Cannot be Send Out by Producer
> From: joe.stein@stealth.ly
> To: users@kafka.apache.org
> 
> Yes, that was what I was thinking, you don't need to set the serializer
> class if you want Array[byte] that is the default. Remove the line
> c.put("key.serializer.class",
> "kafka.serializer.StringEncoder") you should either see it work or have to
> work through the next issue, hopefully the former =8^)
> 
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
> 
> On Thu, Nov 20, 2014 at 8:42 PM, Haoming Zhang <ha...@outlook.com>
> wrote:
> 
> > Hi Joe,
> >
> > You remind me, maybe I included the incorrect serializer.
> >
> > Here is how I created the producer:
> >       And(s"a synchronous Kafka producer app that writes to the topic
> > $inputTopic")
> >       val producerApp = {
> >         val config = {
> >           val c = new Properties
> >           c.put("producer.type", "sync")
> >           c.put("client.id", "kafka-spark-streaming-test-sync-producer")
> >           c.put("request.required.acks", "1")
> >           c.put("key.serializer.class", "kafka.serializer.StringEncoder")
> >           c
> >         }
> >         kafkaZkCluster.createProducer(inputTopic.name, config).get
> >       }
> >
> > I have included the "key.serializer.class", but I'm not sure whether I did
> > correct..
> >
> > The following is the error message:
> >
> > 14/11/20 17:08:11 INFO KafkaSparkStreamingSpec: Synchronously sending
> > Tweet {"username": "ANY_USER_1", "text": "ANY_TEXT_1", "timestamp":
> > 1416532086} to topic Some(testingInput)
> > [B@3eb78a85
> > KeyedMessage(testingInput,[B@3eb78a85,[B@3eb78a85,[B@3bf54b00)
> > msg.key: [B@3eb78a85
> > msg.message: [B@3bf54b00
> > msg.partKey: [B@3eb78a85
> > msg.topic: testingInput
> > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> >     at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
> >     at
> > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:128)
> >     at
> > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)
> >     at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >     at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> >     at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> >     at
> > kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
> >     at
> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
> >     at kafka.producer.Producer.send(Producer.scala:76)
> >     at com.cisco.npa.kafka.KafkaProducerApp.send(KafkaProducerApp.scala:87)
> >     at
> > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:171)
> >     at
> > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:153)
> >     at scala.collection.immutable.List.foreach(List.scala:318)
> >     at
> > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(KafkaSparkStreamingSpec.scala:153)
> >     at
> > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
> >     at
> > com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
> >     at
> > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
> >     at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> >     at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> >     at org.scalatest.Transformer.apply(Transformer.scala:22)
> >     at org.scalatest.Transformer.apply(Transformer.scala:20)
> >     at
> > org.scalatest.FeatureSpecLike$$anon$1.apply(FeatureSpecLike.scala:199)
> >     at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
> >     at org.scalatest.FeatureSpec.withFixture(FeatureSpec.scala:1836)
> >     at
> > org.scalatest.FeatureSpecLike$class.invokeWithFixture$1(FeatureSpecLike.scala:196)
> >     at
> > org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
> >     at
> > org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
> >     at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
> >     at
> > org.scalatest.FeatureSpecLike$class.runTest(FeatureSpecLike.scala:208)
> >     at com.cisco.npa.spark.KafkaSparkStreamingSpec.org
> > $scalatest$BeforeAndAfterEach$$super$runTest(KafkaSparkStreamingSpec.scala:35)
> >     at
> > org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
> >     at
> > com.cisco.npa.spark.KafkaSparkStreamingSpec.runTest(KafkaSparkStreamingSpec.scala:35)
> >     at
> > org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
> >     at
> > org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
> >     at
> > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
> >     at
> > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
> >     at scala.collection.immutable.List.foreach(List.scala:318)
> >     at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> >     at org.scalatest.SuperEngine.org
> > $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
> >     at
> > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
> >     at
> > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
> >     at scala.collection.immutable.List.foreach(List.scala:318)
> >     at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> >     at org.scalatest.SuperEngine.org
> > $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
> >     at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
> >     at
> > org.scalatest.FeatureSpecLike$class.runTests(FeatureSpecLike.scala:267)
> >     at org.scalatest.FeatureSpec.runTests(FeatureSpec.scala:1836)
> >     at org.scalatest.Suite$class.run(Suite.scala:1424)
> >     at org.scalatest.FeatureSpec.org
> > $scalatest$FeatureSpecLike$$super$run(FeatureSpec.scala:1836)
> >     at
> > org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
> >     at
> > org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
> >     at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
> >     at org.scalatest.FeatureSpecLike$class.run(FeatureSpecLike.scala:309)
> >     at org.scalatest.FeatureSpec.run(FeatureSpec.scala:1836)
> >     at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
> >     at
> > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
> >     at
> > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
> >     at scala.collection.immutable.List.foreach(List.scala:318)
> >     at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
> >     at
> > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
> >     at
> > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
> >     at
> > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
> >     at
> > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
> >     at org.scalatest.tools.Runner$.main(Runner.scala:860)
> >     at org.scalatest.tools.Runner.main(Runner.scala)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >     at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >     at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >     at java.lang.reflect.Method.invoke(Method.java:606)
> >     at
> > scala.tools.eclipse.scalatest.launching.ScalaTestLauncher$.main(ScalaTestLauncher.scala:58)
> >     at
> > scala.tools.eclipse.scalatest.launching.ScalaTestLauncher.main(ScalaTestLauncher.scala)
> >
> > Thanks,
> > Haoming
> >
> > > Date: Thu, 20 Nov 2014 20:29:44 -0500
> > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > From: joe.stein@stealth.ly
> > > To: users@kafka.apache.org
> > >
> > > iI would helpful to see the full stack trace. Also, how have you
> > > instantiated your Producer class? Did you set a value for
> > > "serializer.class" in the property?
> > >
> > > /*******************************************
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > ********************************************/
> > >
> > > On Thu, Nov 20, 2014 at 8:15 PM, Haoming Zhang <
> > haoming.zhang@outlook.com>
> > > wrote:
> > >
> > > > Hi Harsha,
> > > >
> > > > I just tried to hard code a string message, then convert the message to
> > > > byte array, but no lucky...
> > > >
> > > > The following is how my program works:
> > > >
> > > > Create a hardcode key, which is String, then convert to byte array,
> > > > iterate a network message, send the message one by one:
> > > >       networkelements foreach {
> > > >         case networkelement =>
> > > >           val bytes = Injection(networkelement)
> > > >           logger.info(s"Synchronously sending Tweet $networkelement to
> > > > topic ${producerApp.defaultTopic}")
> > > >
> > > >           val hardKey = "2"
> > > >           val parkey = hardKey.getBytes("UTF8")
> > > >           val topic = producerApp.defaultTopic
> > > >           producerApp.send(parkey, bytes, topic)
> > > >       }
> > > >
> > > >
> > > > Here is how the networkelements created, where NetworkElement is a
> > class
> > > > that created by avro, I think you can ignore it:
> > > > val networkelements = fixture.messages
> > > >
> > > >   val fixture = {
> > > >     val BeginningOfEpoch = 0.seconds
> > > >     val AnyTimestamp = 1234.seconds
> > > >     val now = System.currentTimeMillis().millis
> > > >
> > > >     new {
> > > >       val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1",
> > > > now.toSeconds)
> > > >       val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2",
> > > > BeginningOfEpoch.toSeconds)
> > > >       val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3",
> > > > AnyTimestamp.toSeconds)
> > > >
> > > >       val messages = Seq(t1, t2, t3)
> > > >     }
> > > >   }
> > > >
> > > > BTW, I defined the Key and Val types as following:
> > > >
> > > >   type Key = Array[Byte]
> > > >   type Val = Array[Byte]
> > > >
> > > > Thanks,
> > > > Haoming
> > > >
> > > > > From: kafka@harsha.io
> > > > > To: users@kafka.apache.org
> > > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > > Date: Thu, 20 Nov 2014 16:59:19 -0800
> > > > >
> > > > > also the (key: Key, value: Val, topic: Option[String]) "value"
> > should be
> > > > > a string converted to a byte array.
> > > > > Can you send a example of your key and value data.
> > > > >
> > > > >
> > > > > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> > > > > > Hi Harsha,
> > > > > >
> > > > > > Thanks for suggestion!
> > > > > >
> > > > > > I have checked this link before, and I tried to create the
> > partition
> > > > key
> > > > > > like the following:
> > > > > >           val hardKey = "2"
> > > > > >           val parkey = hardKey.getBytes("UTF8")
> > > > > >
> > > > > > But I still get the same exception. I also tried set "UTF8" as
> > "UTF-8",
> > > > > > but no luck...
> > > > > >
> > > > > > Regards,
> > > > > > Haoming
> > > > > >
> > > > > > > From: kafka@harsha.io
> > > > > > > To: users@kafka.apache.org
> > > > > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > > > > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > > > > > >
> > > > > > > Hi Haoming,
> > > > > > >           Take a look at the code here
> > > > > > >
> > > >
> > https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > > > > > > for your partKey it should be string and when you converting it
> > into
> > > > > > > byte array you can use partKey.getBytes("UTF8")
> > > > > > > -Harsha
> > > > > > >
> > > > > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I'm a beginner of Kafka, currently I'm stuck by how to send
> > out a
> > > > > > > > KeyedMessage by producer. I would like to design a partition
> > > > > > > > function to route the message based on the key, but the
> > producer
> > > > cannot
> > > > > > > > send the KeyedMessage and I got this exception:
> > > > > > > > java.lang.ClassCastException: [B cannot be cast to
> > java.lang.String
> > > > > > > >
> > > > > > > > What I tried is hardcode a partition key ( I tried String and
> > > > Integer,
> > > > > > > > currently it is Integer ), then convert the partition key to
> > Byte
> > > > Array:
> > > > > > > >           val converter = new DataTypeConvertion
> > > > > > > >           val hardKey = 2
> > > > > > > >           val partkey = converter.intToByteArray(hardKey)
> > > > > > > >
> > > > > > > >  Then create a KeyedMessage by the following function:
> > > > > > > >
> > > > > > > >   private def toMessage(value: Val, key: Option[Key] = None,
> > topic:
> > > > > > > >   Option[String] = None): KeyedMessage[Key, Val] = {
> > > > > > > >     val t = topic.get
> > > > > > > >     require(!t.isEmpty, "Topic must not be empty")
> > > > > > > >     key match {
> > > > > > > >       case Some(key) => new KeyedMessage(t, key, value)
> > > > > > > >       case _ => new KeyedMessage(t, value)
> > > > > > > >     }
> > > > > > > >   }
> > > > > > > >
> > > > > > > > Then try to send the KeyedMessage by a Kafka producer:
> > > > > > > >
> > > > > > > >   def send(key: Key, value: Val, topic: Option[String] = None)
> > {
> > > > > > > >     val msg = toMessage(value, Option(key), topic)
> > > > > > > >     print(msg + "\n")
> > > > > > > >     print("msg.key" + msg.key + "\n")
> > > > > > > >     print("msg.message" + msg.message + "\n")
> > > > > > > >     print("msg.partKey" + msg.partKey + "\n")
> > > > > > > >     print("msg.topic" + msg.topic + "\n")
> > > > > > > >     try {
> > > > > > > >       p.send(msg) //P is an instance of producer, exception
> > > > happens in
> > > > > > > >       this line
> > > > > > > >     } catch {
> > > > > > > >       case e: Exception =>
> > > > > > > >         e.printStackTrace()
> > > > > > > >         System.exit(1)
> > > > > > > >     }
> > > > > > > >   }
> > > > > > > >
> > > > > > > > As you can see, I added many print statement in the above
> > > > function, and
> > > > > > > > the following is the output of above function:
> > > > > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > > > > > > msg.key: [B@7ad40950
> > > > > > > > msg.message: [B@7ce18764
> > > > > > > > msg.partKey: [B@7ad40950
> > > > > > > > msg.topic: testingInput
> > > > > > > >
> > > > > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think
> > it
> > > > is a
> > > > > > > > memory address and the exception
> > (java.lang.ClassCastException: [B
> > > > cannot
> > > > > > > > be cast to java.lang.String) happens when "send" function try
> > to
> > > > convert
> > > > > > > > the Byte Array to String.
> > > > > > > >
> > > > > > > > Am I wrong on creating a key in Byte Array type?
> > > > > > > > Some examples of how to use KeyedMessage will be great!
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Haoming
> > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > > >
> >
> >
 		 	   		  

Re: Partition Key Cannot be Send Out by Producer

Posted by Joe Stein <jo...@stealth.ly>.
Yes, that was what I was thinking, you don't need to set the serializer
class if you want Array[byte] that is the default. Remove the line
c.put("key.serializer.class",
"kafka.serializer.StringEncoder") you should either see it work or have to
work through the next issue, hopefully the former =8^)

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/

On Thu, Nov 20, 2014 at 8:42 PM, Haoming Zhang <ha...@outlook.com>
wrote:

> Hi Joe,
>
> You remind me, maybe I included the incorrect serializer.
>
> Here is how I created the producer:
>       And(s"a synchronous Kafka producer app that writes to the topic
> $inputTopic")
>       val producerApp = {
>         val config = {
>           val c = new Properties
>           c.put("producer.type", "sync")
>           c.put("client.id", "kafka-spark-streaming-test-sync-producer")
>           c.put("request.required.acks", "1")
>           c.put("key.serializer.class", "kafka.serializer.StringEncoder")
>           c
>         }
>         kafkaZkCluster.createProducer(inputTopic.name, config).get
>       }
>
> I have included the "key.serializer.class", but I'm not sure whether I did
> correct..
>
> The following is the error message:
>
> 14/11/20 17:08:11 INFO KafkaSparkStreamingSpec: Synchronously sending
> Tweet {"username": "ANY_USER_1", "text": "ANY_TEXT_1", "timestamp":
> 1416532086} to topic Some(testingInput)
> [B@3eb78a85
> KeyedMessage(testingInput,[B@3eb78a85,[B@3eb78a85,[B@3bf54b00)
> msg.key: [B@3eb78a85
> msg.message: [B@3bf54b00
> msg.partKey: [B@3eb78a85
> msg.topic: testingInput
> java.lang.ClassCastException: [B cannot be cast to java.lang.String
>     at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
>     at
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:128)
>     at
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>     at
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
>     at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
>     at kafka.producer.Producer.send(Producer.scala:76)
>     at com.cisco.npa.kafka.KafkaProducerApp.send(KafkaProducerApp.scala:87)
>     at
> com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:171)
>     at
> com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:153)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at
> com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(KafkaSparkStreamingSpec.scala:153)
>     at
> com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
>     at
> com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
>     at
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>     at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>     at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>     at org.scalatest.Transformer.apply(Transformer.scala:22)
>     at org.scalatest.Transformer.apply(Transformer.scala:20)
>     at
> org.scalatest.FeatureSpecLike$$anon$1.apply(FeatureSpecLike.scala:199)
>     at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
>     at org.scalatest.FeatureSpec.withFixture(FeatureSpec.scala:1836)
>     at
> org.scalatest.FeatureSpecLike$class.invokeWithFixture$1(FeatureSpecLike.scala:196)
>     at
> org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
>     at
> org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
>     at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>     at
> org.scalatest.FeatureSpecLike$class.runTest(FeatureSpecLike.scala:208)
>     at com.cisco.npa.spark.KafkaSparkStreamingSpec.org
> $scalatest$BeforeAndAfterEach$$super$runTest(KafkaSparkStreamingSpec.scala:35)
>     at
> org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
>     at
> com.cisco.npa.spark.KafkaSparkStreamingSpec.runTest(KafkaSparkStreamingSpec.scala:35)
>     at
> org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
>     at
> org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
>     at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
>     at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>     at org.scalatest.SuperEngine.org
> $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
>     at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
>     at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>     at org.scalatest.SuperEngine.org
> $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
>     at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
>     at
> org.scalatest.FeatureSpecLike$class.runTests(FeatureSpecLike.scala:267)
>     at org.scalatest.FeatureSpec.runTests(FeatureSpec.scala:1836)
>     at org.scalatest.Suite$class.run(Suite.scala:1424)
>     at org.scalatest.FeatureSpec.org
> $scalatest$FeatureSpecLike$$super$run(FeatureSpec.scala:1836)
>     at
> org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
>     at
> org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
>     at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
>     at org.scalatest.FeatureSpecLike$class.run(FeatureSpecLike.scala:309)
>     at org.scalatest.FeatureSpec.run(FeatureSpec.scala:1836)
>     at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
>     at
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
>     at
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
>     at scala.collection.immutable.List.foreach(List.scala:318)
>     at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
>     at
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
>     at
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
>     at
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
>     at
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
>     at org.scalatest.tools.Runner$.main(Runner.scala:860)
>     at org.scalatest.tools.Runner.main(Runner.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> scala.tools.eclipse.scalatest.launching.ScalaTestLauncher$.main(ScalaTestLauncher.scala:58)
>     at
> scala.tools.eclipse.scalatest.launching.ScalaTestLauncher.main(ScalaTestLauncher.scala)
>
> Thanks,
> Haoming
>
> > Date: Thu, 20 Nov 2014 20:29:44 -0500
> > Subject: Re: Partition Key Cannot be Send Out by Producer
> > From: joe.stein@stealth.ly
> > To: users@kafka.apache.org
> >
> > iI would helpful to see the full stack trace. Also, how have you
> > instantiated your Producer class? Did you set a value for
> > "serializer.class" in the property?
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> > On Thu, Nov 20, 2014 at 8:15 PM, Haoming Zhang <
> haoming.zhang@outlook.com>
> > wrote:
> >
> > > Hi Harsha,
> > >
> > > I just tried to hard code a string message, then convert the message to
> > > byte array, but no lucky...
> > >
> > > The following is how my program works:
> > >
> > > Create a hardcode key, which is String, then convert to byte array,
> > > iterate a network message, send the message one by one:
> > >       networkelements foreach {
> > >         case networkelement =>
> > >           val bytes = Injection(networkelement)
> > >           logger.info(s"Synchronously sending Tweet $networkelement to
> > > topic ${producerApp.defaultTopic}")
> > >
> > >           val hardKey = "2"
> > >           val parkey = hardKey.getBytes("UTF8")
> > >           val topic = producerApp.defaultTopic
> > >           producerApp.send(parkey, bytes, topic)
> > >       }
> > >
> > >
> > > Here is how the networkelements created, where NetworkElement is a
> class
> > > that created by avro, I think you can ignore it:
> > > val networkelements = fixture.messages
> > >
> > >   val fixture = {
> > >     val BeginningOfEpoch = 0.seconds
> > >     val AnyTimestamp = 1234.seconds
> > >     val now = System.currentTimeMillis().millis
> > >
> > >     new {
> > >       val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1",
> > > now.toSeconds)
> > >       val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2",
> > > BeginningOfEpoch.toSeconds)
> > >       val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3",
> > > AnyTimestamp.toSeconds)
> > >
> > >       val messages = Seq(t1, t2, t3)
> > >     }
> > >   }
> > >
> > > BTW, I defined the Key and Val types as following:
> > >
> > >   type Key = Array[Byte]
> > >   type Val = Array[Byte]
> > >
> > > Thanks,
> > > Haoming
> > >
> > > > From: kafka@harsha.io
> > > > To: users@kafka.apache.org
> > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > Date: Thu, 20 Nov 2014 16:59:19 -0800
> > > >
> > > > also the (key: Key, value: Val, topic: Option[String]) "value"
> should be
> > > > a string converted to a byte array.
> > > > Can you send a example of your key and value data.
> > > >
> > > >
> > > > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> > > > > Hi Harsha,
> > > > >
> > > > > Thanks for suggestion!
> > > > >
> > > > > I have checked this link before, and I tried to create the
> partition
> > > key
> > > > > like the following:
> > > > >           val hardKey = "2"
> > > > >           val parkey = hardKey.getBytes("UTF8")
> > > > >
> > > > > But I still get the same exception. I also tried set "UTF8" as
> "UTF-8",
> > > > > but no luck...
> > > > >
> > > > > Regards,
> > > > > Haoming
> > > > >
> > > > > > From: kafka@harsha.io
> > > > > > To: users@kafka.apache.org
> > > > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > > > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > > > > >
> > > > > > Hi Haoming,
> > > > > >           Take a look at the code here
> > > > > >
> > >
> https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > > > > > for your partKey it should be string and when you converting it
> into
> > > > > > byte array you can use partKey.getBytes("UTF8")
> > > > > > -Harsha
> > > > > >
> > > > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I'm a beginner of Kafka, currently I'm stuck by how to send
> out a
> > > > > > > KeyedMessage by producer. I would like to design a partition
> > > > > > > function to route the message based on the key, but the
> producer
> > > cannot
> > > > > > > send the KeyedMessage and I got this exception:
> > > > > > > java.lang.ClassCastException: [B cannot be cast to
> java.lang.String
> > > > > > >
> > > > > > > What I tried is hardcode a partition key ( I tried String and
> > > Integer,
> > > > > > > currently it is Integer ), then convert the partition key to
> Byte
> > > Array:
> > > > > > >           val converter = new DataTypeConvertion
> > > > > > >           val hardKey = 2
> > > > > > >           val partkey = converter.intToByteArray(hardKey)
> > > > > > >
> > > > > > >  Then create a KeyedMessage by the following function:
> > > > > > >
> > > > > > >   private def toMessage(value: Val, key: Option[Key] = None,
> topic:
> > > > > > >   Option[String] = None): KeyedMessage[Key, Val] = {
> > > > > > >     val t = topic.get
> > > > > > >     require(!t.isEmpty, "Topic must not be empty")
> > > > > > >     key match {
> > > > > > >       case Some(key) => new KeyedMessage(t, key, value)
> > > > > > >       case _ => new KeyedMessage(t, value)
> > > > > > >     }
> > > > > > >   }
> > > > > > >
> > > > > > > Then try to send the KeyedMessage by a Kafka producer:
> > > > > > >
> > > > > > >   def send(key: Key, value: Val, topic: Option[String] = None)
> {
> > > > > > >     val msg = toMessage(value, Option(key), topic)
> > > > > > >     print(msg + "\n")
> > > > > > >     print("msg.key" + msg.key + "\n")
> > > > > > >     print("msg.message" + msg.message + "\n")
> > > > > > >     print("msg.partKey" + msg.partKey + "\n")
> > > > > > >     print("msg.topic" + msg.topic + "\n")
> > > > > > >     try {
> > > > > > >       p.send(msg) //P is an instance of producer, exception
> > > happens in
> > > > > > >       this line
> > > > > > >     } catch {
> > > > > > >       case e: Exception =>
> > > > > > >         e.printStackTrace()
> > > > > > >         System.exit(1)
> > > > > > >     }
> > > > > > >   }
> > > > > > >
> > > > > > > As you can see, I added many print statement in the above
> > > function, and
> > > > > > > the following is the output of above function:
> > > > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > > > > > msg.key: [B@7ad40950
> > > > > > > msg.message: [B@7ce18764
> > > > > > > msg.partKey: [B@7ad40950
> > > > > > > msg.topic: testingInput
> > > > > > >
> > > > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think
> it
> > > is a
> > > > > > > memory address and the exception
> (java.lang.ClassCastException: [B
> > > cannot
> > > > > > > be cast to java.lang.String) happens when "send" function try
> to
> > > convert
> > > > > > > the Byte Array to String.
> > > > > > >
> > > > > > > Am I wrong on creating a key in Byte Array type?
> > > > > > > Some examples of how to use KeyedMessage will be great!
> > > > > > >
> > > > > > > Regards,
> > > > > > > Haoming
> > > > > > >
> > > > > > >
> > > > >
> > >
> > >
>
>

RE: Partition Key Cannot be Send Out by Producer

Posted by Haoming Zhang <ha...@outlook.com>.
Hi Joe,

You remind me, maybe I included the incorrect serializer.

Here is how I created the producer:
      And(s"a synchronous Kafka producer app that writes to the topic $inputTopic")
      val producerApp = {
        val config = {
          val c = new Properties
          c.put("producer.type", "sync")
          c.put("client.id", "kafka-spark-streaming-test-sync-producer")
          c.put("request.required.acks", "1")
          c.put("key.serializer.class", "kafka.serializer.StringEncoder")
          c
        }
        kafkaZkCluster.createProducer(inputTopic.name, config).get
      }

I have included the "key.serializer.class", but I'm not sure whether I did correct..

The following is the error message:

14/11/20 17:08:11 INFO KafkaSparkStreamingSpec: Synchronously sending Tweet {"username": "ANY_USER_1", "text": "ANY_TEXT_1", "timestamp": 1416532086} to topic Some(testingInput)
[B@3eb78a85
KeyedMessage(testingInput,[B@3eb78a85,[B@3eb78a85,[B@3bf54b00)
msg.key: [B@3eb78a85
msg.message: [B@3bf54b00
msg.partKey: [B@3eb78a85
msg.topic: testingInput
java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
    at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:128)
    at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
    at kafka.producer.Producer.send(Producer.scala:76)
    at com.cisco.npa.kafka.KafkaProducerApp.send(KafkaProducerApp.scala:87)
    at com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:171)
    at com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$3.apply(KafkaSparkStreamingSpec.scala:153)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(KafkaSparkStreamingSpec.scala:153)
    at com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
    at com.cisco.npa.spark.KafkaSparkStreamingSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KafkaSparkStreamingSpec.scala:117)
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FeatureSpecLike$$anon$1.apply(FeatureSpecLike.scala:199)
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
    at org.scalatest.FeatureSpec.withFixture(FeatureSpec.scala:1836)
    at org.scalatest.FeatureSpecLike$class.invokeWithFixture$1(FeatureSpecLike.scala:196)
    at org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
    at org.scalatest.FeatureSpecLike$$anonfun$runTest$1.apply(FeatureSpecLike.scala:208)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.FeatureSpecLike$class.runTest(FeatureSpecLike.scala:208)
    at com.cisco.npa.spark.KafkaSparkStreamingSpec.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaSparkStreamingSpec.scala:35)
    at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
    at com.cisco.npa.spark.KafkaSparkStreamingSpec.runTest(KafkaSparkStreamingSpec.scala:35)
    at org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
    at org.scalatest.FeatureSpecLike$$anonfun$runTests$1.apply(FeatureSpecLike.scala:267)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FeatureSpecLike$class.runTests(FeatureSpecLike.scala:267)
    at org.scalatest.FeatureSpec.runTests(FeatureSpec.scala:1836)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at org.scalatest.FeatureSpec.org$scalatest$FeatureSpecLike$$super$run(FeatureSpec.scala:1836)
    at org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
    at org.scalatest.FeatureSpecLike$$anonfun$run$1.apply(FeatureSpecLike.scala:309)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FeatureSpecLike$class.run(FeatureSpecLike.scala:309)
    at org.scalatest.FeatureSpec.run(FeatureSpec.scala:1836)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
    at org.scalatest.tools.Runner$.main(Runner.scala:860)
    at org.scalatest.tools.Runner.main(Runner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.eclipse.scalatest.launching.ScalaTestLauncher$.main(ScalaTestLauncher.scala:58)
    at scala.tools.eclipse.scalatest.launching.ScalaTestLauncher.main(ScalaTestLauncher.scala)

Thanks,
Haoming

> Date: Thu, 20 Nov 2014 20:29:44 -0500
> Subject: Re: Partition Key Cannot be Send Out by Producer
> From: joe.stein@stealth.ly
> To: users@kafka.apache.org
> 
> iI would helpful to see the full stack trace. Also, how have you
> instantiated your Producer class? Did you set a value for
> "serializer.class" in the property?
> 
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
> 
> On Thu, Nov 20, 2014 at 8:15 PM, Haoming Zhang <ha...@outlook.com>
> wrote:
> 
> > Hi Harsha,
> >
> > I just tried to hard code a string message, then convert the message to
> > byte array, but no lucky...
> >
> > The following is how my program works:
> >
> > Create a hardcode key, which is String, then convert to byte array,
> > iterate a network message, send the message one by one:
> >       networkelements foreach {
> >         case networkelement =>
> >           val bytes = Injection(networkelement)
> >           logger.info(s"Synchronously sending Tweet $networkelement to
> > topic ${producerApp.defaultTopic}")
> >
> >           val hardKey = "2"
> >           val parkey = hardKey.getBytes("UTF8")
> >           val topic = producerApp.defaultTopic
> >           producerApp.send(parkey, bytes, topic)
> >       }
> >
> >
> > Here is how the networkelements created, where NetworkElement is a class
> > that created by avro, I think you can ignore it:
> > val networkelements = fixture.messages
> >
> >   val fixture = {
> >     val BeginningOfEpoch = 0.seconds
> >     val AnyTimestamp = 1234.seconds
> >     val now = System.currentTimeMillis().millis
> >
> >     new {
> >       val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1",
> > now.toSeconds)
> >       val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2",
> > BeginningOfEpoch.toSeconds)
> >       val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3",
> > AnyTimestamp.toSeconds)
> >
> >       val messages = Seq(t1, t2, t3)
> >     }
> >   }
> >
> > BTW, I defined the Key and Val types as following:
> >
> >   type Key = Array[Byte]
> >   type Val = Array[Byte]
> >
> > Thanks,
> > Haoming
> >
> > > From: kafka@harsha.io
> > > To: users@kafka.apache.org
> > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > Date: Thu, 20 Nov 2014 16:59:19 -0800
> > >
> > > also the (key: Key, value: Val, topic: Option[String]) "value" should be
> > > a string converted to a byte array.
> > > Can you send a example of your key and value data.
> > >
> > >
> > > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> > > > Hi Harsha,
> > > >
> > > > Thanks for suggestion!
> > > >
> > > > I have checked this link before, and I tried to create the partition
> > key
> > > > like the following:
> > > >           val hardKey = "2"
> > > >           val parkey = hardKey.getBytes("UTF8")
> > > >
> > > > But I still get the same exception. I also tried set "UTF8" as "UTF-8",
> > > > but no luck...
> > > >
> > > > Regards,
> > > > Haoming
> > > >
> > > > > From: kafka@harsha.io
> > > > > To: users@kafka.apache.org
> > > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > > > >
> > > > > Hi Haoming,
> > > > >           Take a look at the code here
> > > > >
> > https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > > > > for your partKey it should be string and when you converting it into
> > > > > byte array you can use partKey.getBytes("UTF8")
> > > > > -Harsha
> > > > >
> > > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > I'm a beginner of Kafka, currently I'm stuck by how to send out a
> > > > > > KeyedMessage by producer. I would like to design a partition
> > > > > > function to route the message based on the key, but the producer
> > cannot
> > > > > > send the KeyedMessage and I got this exception:
> > > > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > > > > >
> > > > > > What I tried is hardcode a partition key ( I tried String and
> > Integer,
> > > > > > currently it is Integer ), then convert the partition key to Byte
> > Array:
> > > > > >           val converter = new DataTypeConvertion
> > > > > >           val hardKey = 2
> > > > > >           val partkey = converter.intToByteArray(hardKey)
> > > > > >
> > > > > >  Then create a KeyedMessage by the following function:
> > > > > >
> > > > > >   private def toMessage(value: Val, key: Option[Key] = None, topic:
> > > > > >   Option[String] = None): KeyedMessage[Key, Val] = {
> > > > > >     val t = topic.get
> > > > > >     require(!t.isEmpty, "Topic must not be empty")
> > > > > >     key match {
> > > > > >       case Some(key) => new KeyedMessage(t, key, value)
> > > > > >       case _ => new KeyedMessage(t, value)
> > > > > >     }
> > > > > >   }
> > > > > >
> > > > > > Then try to send the KeyedMessage by a Kafka producer:
> > > > > >
> > > > > >   def send(key: Key, value: Val, topic: Option[String] = None) {
> > > > > >     val msg = toMessage(value, Option(key), topic)
> > > > > >     print(msg + "\n")
> > > > > >     print("msg.key" + msg.key + "\n")
> > > > > >     print("msg.message" + msg.message + "\n")
> > > > > >     print("msg.partKey" + msg.partKey + "\n")
> > > > > >     print("msg.topic" + msg.topic + "\n")
> > > > > >     try {
> > > > > >       p.send(msg) //P is an instance of producer, exception
> > happens in
> > > > > >       this line
> > > > > >     } catch {
> > > > > >       case e: Exception =>
> > > > > >         e.printStackTrace()
> > > > > >         System.exit(1)
> > > > > >     }
> > > > > >   }
> > > > > >
> > > > > > As you can see, I added many print statement in the above
> > function, and
> > > > > > the following is the output of above function:
> > > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > > > > msg.key: [B@7ad40950
> > > > > > msg.message: [B@7ce18764
> > > > > > msg.partKey: [B@7ad40950
> > > > > > msg.topic: testingInput
> > > > > >
> > > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think it
> > is a
> > > > > > memory address and the exception (java.lang.ClassCastException: [B
> > cannot
> > > > > > be cast to java.lang.String) happens when "send" function try to
> > convert
> > > > > > the Byte Array to String.
> > > > > >
> > > > > > Am I wrong on creating a key in Byte Array type?
> > > > > > Some examples of how to use KeyedMessage will be great!
> > > > > >
> > > > > > Regards,
> > > > > > Haoming
> > > > > >
> > > > > >
> > > >
> >
> >
 		 	   		  

Re: Partition Key Cannot be Send Out by Producer

Posted by Joe Stein <jo...@stealth.ly>.
iI would helpful to see the full stack trace. Also, how have you
instantiated your Producer class? Did you set a value for
"serializer.class" in the property?

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/

On Thu, Nov 20, 2014 at 8:15 PM, Haoming Zhang <ha...@outlook.com>
wrote:

> Hi Harsha,
>
> I just tried to hard code a string message, then convert the message to
> byte array, but no lucky...
>
> The following is how my program works:
>
> Create a hardcode key, which is String, then convert to byte array,
> iterate a network message, send the message one by one:
>       networkelements foreach {
>         case networkelement =>
>           val bytes = Injection(networkelement)
>           logger.info(s"Synchronously sending Tweet $networkelement to
> topic ${producerApp.defaultTopic}")
>
>           val hardKey = "2"
>           val parkey = hardKey.getBytes("UTF8")
>           val topic = producerApp.defaultTopic
>           producerApp.send(parkey, bytes, topic)
>       }
>
>
> Here is how the networkelements created, where NetworkElement is a class
> that created by avro, I think you can ignore it:
> val networkelements = fixture.messages
>
>   val fixture = {
>     val BeginningOfEpoch = 0.seconds
>     val AnyTimestamp = 1234.seconds
>     val now = System.currentTimeMillis().millis
>
>     new {
>       val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1",
> now.toSeconds)
>       val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2",
> BeginningOfEpoch.toSeconds)
>       val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3",
> AnyTimestamp.toSeconds)
>
>       val messages = Seq(t1, t2, t3)
>     }
>   }
>
> BTW, I defined the Key and Val types as following:
>
>   type Key = Array[Byte]
>   type Val = Array[Byte]
>
> Thanks,
> Haoming
>
> > From: kafka@harsha.io
> > To: users@kafka.apache.org
> > Subject: Re: Partition Key Cannot be Send Out by Producer
> > Date: Thu, 20 Nov 2014 16:59:19 -0800
> >
> > also the (key: Key, value: Val, topic: Option[String]) "value" should be
> > a string converted to a byte array.
> > Can you send a example of your key and value data.
> >
> >
> > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> > > Hi Harsha,
> > >
> > > Thanks for suggestion!
> > >
> > > I have checked this link before, and I tried to create the partition
> key
> > > like the following:
> > >           val hardKey = "2"
> > >           val parkey = hardKey.getBytes("UTF8")
> > >
> > > But I still get the same exception. I also tried set "UTF8" as "UTF-8",
> > > but no luck...
> > >
> > > Regards,
> > > Haoming
> > >
> > > > From: kafka@harsha.io
> > > > To: users@kafka.apache.org
> > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > > >
> > > > Hi Haoming,
> > > >           Take a look at the code here
> > > >
> https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > > > for your partKey it should be string and when you converting it into
> > > > byte array you can use partKey.getBytes("UTF8")
> > > > -Harsha
> > > >
> > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > > > Hi all,
> > > > >
> > > > > I'm a beginner of Kafka, currently I'm stuck by how to send out a
> > > > > KeyedMessage by producer. I would like to design a partition
> > > > > function to route the message based on the key, but the producer
> cannot
> > > > > send the KeyedMessage and I got this exception:
> > > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > > > >
> > > > > What I tried is hardcode a partition key ( I tried String and
> Integer,
> > > > > currently it is Integer ), then convert the partition key to Byte
> Array:
> > > > >           val converter = new DataTypeConvertion
> > > > >           val hardKey = 2
> > > > >           val partkey = converter.intToByteArray(hardKey)
> > > > >
> > > > >  Then create a KeyedMessage by the following function:
> > > > >
> > > > >   private def toMessage(value: Val, key: Option[Key] = None, topic:
> > > > >   Option[String] = None): KeyedMessage[Key, Val] = {
> > > > >     val t = topic.get
> > > > >     require(!t.isEmpty, "Topic must not be empty")
> > > > >     key match {
> > > > >       case Some(key) => new KeyedMessage(t, key, value)
> > > > >       case _ => new KeyedMessage(t, value)
> > > > >     }
> > > > >   }
> > > > >
> > > > > Then try to send the KeyedMessage by a Kafka producer:
> > > > >
> > > > >   def send(key: Key, value: Val, topic: Option[String] = None) {
> > > > >     val msg = toMessage(value, Option(key), topic)
> > > > >     print(msg + "\n")
> > > > >     print("msg.key" + msg.key + "\n")
> > > > >     print("msg.message" + msg.message + "\n")
> > > > >     print("msg.partKey" + msg.partKey + "\n")
> > > > >     print("msg.topic" + msg.topic + "\n")
> > > > >     try {
> > > > >       p.send(msg) //P is an instance of producer, exception
> happens in
> > > > >       this line
> > > > >     } catch {
> > > > >       case e: Exception =>
> > > > >         e.printStackTrace()
> > > > >         System.exit(1)
> > > > >     }
> > > > >   }
> > > > >
> > > > > As you can see, I added many print statement in the above
> function, and
> > > > > the following is the output of above function:
> > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > > > msg.key: [B@7ad40950
> > > > > msg.message: [B@7ce18764
> > > > > msg.partKey: [B@7ad40950
> > > > > msg.topic: testingInput
> > > > >
> > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think it
> is a
> > > > > memory address and the exception (java.lang.ClassCastException: [B
> cannot
> > > > > be cast to java.lang.String) happens when "send" function try to
> convert
> > > > > the Byte Array to String.
> > > > >
> > > > > Am I wrong on creating a key in Byte Array type?
> > > > > Some examples of how to use KeyedMessage will be great!
> > > > >
> > > > > Regards,
> > > > > Haoming
> > > > >
> > > > >
> > >
>
>

RE: Partition Key Cannot be Send Out by Producer

Posted by Haoming Zhang <ha...@outlook.com>.
Hi Harsha,

I just tried to hard code a string message, then convert the message to byte array, but no lucky...

The following is how my program works:

Create a hardcode key, which is String, then convert to byte array, iterate a network message, send the message one by one:
      networkelements foreach {
        case networkelement =>
          val bytes = Injection(networkelement)
          logger.info(s"Synchronously sending Tweet $networkelement to topic ${producerApp.defaultTopic}")

          val hardKey = "2"
          val parkey = hardKey.getBytes("UTF8")
          val topic = producerApp.defaultTopic
          producerApp.send(parkey, bytes, topic)
      }


Here is how the networkelements created, where NetworkElement is a class that created by avro, I think you can ignore it:
val networkelements = fixture.messages

  val fixture = {
    val BeginningOfEpoch = 0.seconds
    val AnyTimestamp = 1234.seconds
    val now = System.currentTimeMillis().millis

    new {
      val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1", now.toSeconds)
      val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds)
      val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds)

      val messages = Seq(t1, t2, t3)
    }
  }

BTW, I defined the Key and Val types as following:

  type Key = Array[Byte]
  type Val = Array[Byte]

Thanks,
Haoming

> From: kafka@harsha.io
> To: users@kafka.apache.org
> Subject: Re: Partition Key Cannot be Send Out by Producer
> Date: Thu, 20 Nov 2014 16:59:19 -0800
> 
> also the (key: Key, value: Val, topic: Option[String]) "value" should be
> a string converted to a byte array.
> Can you send a example of your key and value data.
> 
> 
> On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> > Hi Harsha,
> > 
> > Thanks for suggestion!
> > 
> > I have checked this link before, and I tried to create the partition key
> > like the following:
> >           val hardKey = "2"
> >           val parkey = hardKey.getBytes("UTF8")
> > 
> > But I still get the same exception. I also tried set "UTF8" as "UTF-8",
> > but no luck...
> > 
> > Regards,
> > Haoming
> > 
> > > From: kafka@harsha.io
> > > To: users@kafka.apache.org
> > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > > 
> > > Hi Haoming,
> > >           Take a look at the code here
> > >           https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > > for your partKey it should be string and when you converting it into
> > > byte array you can use partKey.getBytes("UTF8")
> > > -Harsha
> > > 
> > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > > Hi all,
> > > > 
> > > > I'm a beginner of Kafka, currently I'm stuck by how to send out a
> > > > KeyedMessage by producer. I would like to design a partition
> > > > function to route the message based on the key, but the producer cannot
> > > > send the KeyedMessage and I got this exception:
> > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > > > 
> > > > What I tried is hardcode a partition key ( I tried String and Integer,
> > > > currently it is Integer ), then convert the partition key to Byte Array:
> > > >           val converter = new DataTypeConvertion
> > > >           val hardKey = 2
> > > >           val partkey = converter.intToByteArray(hardKey)
> > > > 
> > > >  Then create a KeyedMessage by the following function:
> > > > 
> > > >   private def toMessage(value: Val, key: Option[Key] = None, topic:
> > > >   Option[String] = None): KeyedMessage[Key, Val] = {
> > > >     val t = topic.get
> > > >     require(!t.isEmpty, "Topic must not be empty")
> > > >     key match {
> > > >       case Some(key) => new KeyedMessage(t, key, value)
> > > >       case _ => new KeyedMessage(t, value)
> > > >     }
> > > >   }
> > > > 
> > > > Then try to send the KeyedMessage by a Kafka producer:
> > > > 
> > > >   def send(key: Key, value: Val, topic: Option[String] = None) {
> > > >     val msg = toMessage(value, Option(key), topic)
> > > >     print(msg + "\n")
> > > >     print("msg.key" + msg.key + "\n")
> > > >     print("msg.message" + msg.message + "\n")
> > > >     print("msg.partKey" + msg.partKey + "\n")
> > > >     print("msg.topic" + msg.topic + "\n")
> > > >     try {
> > > >       p.send(msg) //P is an instance of producer, exception happens in
> > > >       this line
> > > >     } catch {
> > > >       case e: Exception =>
> > > >         e.printStackTrace()
> > > >         System.exit(1)
> > > >     }
> > > >   }
> > > > 
> > > > As you can see, I added many print statement in the above function, and
> > > > the following is the output of above function:
> > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > > msg.key: [B@7ad40950
> > > > msg.message: [B@7ce18764
> > > > msg.partKey: [B@7ad40950
> > > > msg.topic: testingInput
> > > > 
> > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think it is a
> > > > memory address and the exception (java.lang.ClassCastException: [B cannot
> > > > be cast to java.lang.String) happens when "send" function try to convert
> > > > the Byte Array to String.
> > > > 
> > > > Am I wrong on creating a key in Byte Array type?
> > > > Some examples of how to use KeyedMessage will be great!
> > > > 
> > > > Regards,
> > > > Haoming
> > > > 
> > > >  		 	   		  
> >  		 	   		  
 		 	   		  

Re: Partition Key Cannot be Send Out by Producer

Posted by Harsha <ka...@harsha.io>.
also the (key: Key, value: Val, topic: Option[String]) "value" should be
a string converted to a byte array.
Can you send a example of your key and value data.


On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> Hi Harsha,
> 
> Thanks for suggestion!
> 
> I have checked this link before, and I tried to create the partition key
> like the following:
>           val hardKey = "2"
>           val parkey = hardKey.getBytes("UTF8")
> 
> But I still get the same exception. I also tried set "UTF8" as "UTF-8",
> but no luck...
> 
> Regards,
> Haoming
> 
> > From: kafka@harsha.io
> > To: users@kafka.apache.org
> > Subject: Re: Partition Key Cannot be Send Out by Producer
> > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > 
> > Hi Haoming,
> >           Take a look at the code here
> >           https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > for your partKey it should be string and when you converting it into
> > byte array you can use partKey.getBytes("UTF8")
> > -Harsha
> > 
> > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > Hi all,
> > > 
> > > I'm a beginner of Kafka, currently I'm stuck by how to send out a
> > > KeyedMessage by producer. I would like to design a partition
> > > function to route the message based on the key, but the producer cannot
> > > send the KeyedMessage and I got this exception:
> > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > > 
> > > What I tried is hardcode a partition key ( I tried String and Integer,
> > > currently it is Integer ), then convert the partition key to Byte Array:
> > >           val converter = new DataTypeConvertion
> > >           val hardKey = 2
> > >           val partkey = converter.intToByteArray(hardKey)
> > > 
> > >  Then create a KeyedMessage by the following function:
> > > 
> > >   private def toMessage(value: Val, key: Option[Key] = None, topic:
> > >   Option[String] = None): KeyedMessage[Key, Val] = {
> > >     val t = topic.get
> > >     require(!t.isEmpty, "Topic must not be empty")
> > >     key match {
> > >       case Some(key) => new KeyedMessage(t, key, value)
> > >       case _ => new KeyedMessage(t, value)
> > >     }
> > >   }
> > > 
> > > Then try to send the KeyedMessage by a Kafka producer:
> > > 
> > >   def send(key: Key, value: Val, topic: Option[String] = None) {
> > >     val msg = toMessage(value, Option(key), topic)
> > >     print(msg + "\n")
> > >     print("msg.key" + msg.key + "\n")
> > >     print("msg.message" + msg.message + "\n")
> > >     print("msg.partKey" + msg.partKey + "\n")
> > >     print("msg.topic" + msg.topic + "\n")
> > >     try {
> > >       p.send(msg) //P is an instance of producer, exception happens in
> > >       this line
> > >     } catch {
> > >       case e: Exception =>
> > >         e.printStackTrace()
> > >         System.exit(1)
> > >     }
> > >   }
> > > 
> > > As you can see, I added many print statement in the above function, and
> > > the following is the output of above function:
> > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > msg.key: [B@7ad40950
> > > msg.message: [B@7ce18764
> > > msg.partKey: [B@7ad40950
> > > msg.topic: testingInput
> > > 
> > > The key of KeyedMessage is displayed as [B@7ad40950 , I think it is a
> > > memory address and the exception (java.lang.ClassCastException: [B cannot
> > > be cast to java.lang.String) happens when "send" function try to convert
> > > the Byte Array to String.
> > > 
> > > Am I wrong on creating a key in Byte Array type?
> > > Some examples of how to use KeyedMessage will be great!
> > > 
> > > Regards,
> > > Haoming
> > > 
> > >  		 	   		  
>  		 	   		  

RE: Partition Key Cannot be Send Out by Producer

Posted by Haoming Zhang <ha...@outlook.com>.
Hi Harsha,

Thanks for suggestion!

I have checked this link before, and I tried to create the partition key like the following:
          val hardKey = "2"
          val parkey = hardKey.getBytes("UTF8")

But I still get the same exception. I also tried set "UTF8" as "UTF-8", but no luck...

Regards,
Haoming

> From: kafka@harsha.io
> To: users@kafka.apache.org
> Subject: Re: Partition Key Cannot be Send Out by Producer
> Date: Thu, 20 Nov 2014 16:43:11 -0800
> 
> Hi Haoming,
>           Take a look at the code here
>           https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> for your partKey it should be string and when you converting it into
> byte array you can use partKey.getBytes("UTF8")
> -Harsha
> 
> On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > Hi all,
> > 
> > I'm a beginner of Kafka, currently I'm stuck by how to send out a
> > KeyedMessage by producer. I would like to design a partition
> > function to route the message based on the key, but the producer cannot
> > send the KeyedMessage and I got this exception:
> > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > 
> > What I tried is hardcode a partition key ( I tried String and Integer,
> > currently it is Integer ), then convert the partition key to Byte Array:
> >           val converter = new DataTypeConvertion
> >           val hardKey = 2
> >           val partkey = converter.intToByteArray(hardKey)
> > 
> >  Then create a KeyedMessage by the following function:
> > 
> >   private def toMessage(value: Val, key: Option[Key] = None, topic:
> >   Option[String] = None): KeyedMessage[Key, Val] = {
> >     val t = topic.get
> >     require(!t.isEmpty, "Topic must not be empty")
> >     key match {
> >       case Some(key) => new KeyedMessage(t, key, value)
> >       case _ => new KeyedMessage(t, value)
> >     }
> >   }
> > 
> > Then try to send the KeyedMessage by a Kafka producer:
> > 
> >   def send(key: Key, value: Val, topic: Option[String] = None) {
> >     val msg = toMessage(value, Option(key), topic)
> >     print(msg + "\n")
> >     print("msg.key" + msg.key + "\n")
> >     print("msg.message" + msg.message + "\n")
> >     print("msg.partKey" + msg.partKey + "\n")
> >     print("msg.topic" + msg.topic + "\n")
> >     try {
> >       p.send(msg) //P is an instance of producer, exception happens in
> >       this line
> >     } catch {
> >       case e: Exception =>
> >         e.printStackTrace()
> >         System.exit(1)
> >     }
> >   }
> > 
> > As you can see, I added many print statement in the above function, and
> > the following is the output of above function:
> > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > msg.key: [B@7ad40950
> > msg.message: [B@7ce18764
> > msg.partKey: [B@7ad40950
> > msg.topic: testingInput
> > 
> > The key of KeyedMessage is displayed as [B@7ad40950 , I think it is a
> > memory address and the exception (java.lang.ClassCastException: [B cannot
> > be cast to java.lang.String) happens when "send" function try to convert
> > the Byte Array to String.
> > 
> > Am I wrong on creating a key in Byte Array type?
> > Some examples of how to use KeyedMessage will be great!
> > 
> > Regards,
> > Haoming
> > 
> >  		 	   		  
 		 	   		  

Re: Partition Key Cannot be Send Out by Producer

Posted by Harsha <ka...@harsha.io>.
Hi Haoming,
          Take a look at the code here
          https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
for your partKey it should be string and when you converting it into
byte array you can use partKey.getBytes("UTF8")
-Harsha

On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> Hi all,
> 
> I'm a beginner of Kafka, currently I'm stuck by how to send out a
> KeyedMessage by producer. I would like to design a partition
> function to route the message based on the key, but the producer cannot
> send the KeyedMessage and I got this exception:
> java.lang.ClassCastException: [B cannot be cast to java.lang.String
> 
> What I tried is hardcode a partition key ( I tried String and Integer,
> currently it is Integer ), then convert the partition key to Byte Array:
>           val converter = new DataTypeConvertion
>           val hardKey = 2
>           val partkey = converter.intToByteArray(hardKey)
> 
>  Then create a KeyedMessage by the following function:
> 
>   private def toMessage(value: Val, key: Option[Key] = None, topic:
>   Option[String] = None): KeyedMessage[Key, Val] = {
>     val t = topic.get
>     require(!t.isEmpty, "Topic must not be empty")
>     key match {
>       case Some(key) => new KeyedMessage(t, key, value)
>       case _ => new KeyedMessage(t, value)
>     }
>   }
> 
> Then try to send the KeyedMessage by a Kafka producer:
> 
>   def send(key: Key, value: Val, topic: Option[String] = None) {
>     val msg = toMessage(value, Option(key), topic)
>     print(msg + "\n")
>     print("msg.key" + msg.key + "\n")
>     print("msg.message" + msg.message + "\n")
>     print("msg.partKey" + msg.partKey + "\n")
>     print("msg.topic" + msg.topic + "\n")
>     try {
>       p.send(msg) //P is an instance of producer, exception happens in
>       this line
>     } catch {
>       case e: Exception =>
>         e.printStackTrace()
>         System.exit(1)
>     }
>   }
> 
> As you can see, I added many print statement in the above function, and
> the following is the output of above function:
> KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> msg.key: [B@7ad40950
> msg.message: [B@7ce18764
> msg.partKey: [B@7ad40950
> msg.topic: testingInput
> 
> The key of KeyedMessage is displayed as [B@7ad40950 , I think it is a
> memory address and the exception (java.lang.ClassCastException: [B cannot
> be cast to java.lang.String) happens when "send" function try to convert
> the Byte Array to String.
> 
> Am I wrong on creating a key in Byte Array type?
> Some examples of how to use KeyedMessage will be great!
> 
> Regards,
> Haoming
> 
>