You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by sat <sa...@yahoo.com> on 2014/09/12 19:20:22 UTC
Re: Camel-Kafka Component
I've tried to give the kafka component a whirl (disclaimer - I am new to both
camel and kafka).
Tried this and got this exception when sending a message to kafka
In my spring class that produces:
@Autowired
private ProducerTemplate template;
public void sendKakfaMessage(Integer id){
template.sendBodyAndHeader("direct:testingKafka",
String.valueOf(id), KafkaConstants.PARTITION_KEY, "1");
}
In my route in a configuration class
@Bean
public RouteBuilder routeBuilder() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:testingKafka").to("kafka:localhost:9092?topic=test");
}
};
}
Gives me the below exception.
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
[B
at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:128)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at
org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)
at
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:113)
--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Kafka-Component-tp5749525p5756454.html
Sent from the Camel Development mailing list archive at Nabble.com.
Re: Camel-Kafka Component
Posted by sat <sa...@yahoo.com>.
I am answering my own question -
Looks like we have to explicitly specify the serializerClass for the
producer like this
from("direct:testingKafka").to("kafka:localhost:9092?topic=test&serializerClass=kafka.serializer.StringEncoder")
This worked.
Would be nice if it is handled in Camel if possible. I don't know enough
about it or Kafka at the moment to submit a patch. Would like feedback and
I'll see if I can bake it into a pull request.
--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Kafka-Component-tp5749525p5756455.html
Sent from the Camel Development mailing list archive at Nabble.com.