You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by sat <sa...@yahoo.com> on 2014/09/12 19:20:22 UTC

Re: Camel-Kafka Component

I've tried to give the kafka component a whirl (disclaimer - I am new to both
camel and kafka).

Tried this and got this exception when sending a message to kafka

In my spring class that produces:

   @Autowired
    private ProducerTemplate template;

    public void sendKakfaMessage(Integer id){
        template.sendBodyAndHeader("direct:testingKafka",
String.valueOf(id), KafkaConstants.PARTITION_KEY, "1");
    }


In my route in a configuration class

 @Bean
    public RouteBuilder routeBuilder() {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
               
from("direct:testingKafka").to("kafka:localhost:9092?topic=test");
            }
        };
    }


Gives me the below exception.

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
[B
	at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
	at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:128)
	at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125)
	at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125)
	at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:52)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
	at
org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)
	at
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:113)



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Kafka-Component-tp5749525p5756454.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: Camel-Kafka Component

Posted by sat <sa...@yahoo.com>.
I am answering my own question -

Looks like we have to explicitly specify the serializerClass for the
producer like this

from("direct:testingKafka").to("kafka:localhost:9092?topic=test&serializerClass=kafka.serializer.StringEncoder")

This worked. 

Would be nice if it is handled in Camel if possible. I don't know enough
about it or Kafka at the moment to submit a patch. Would like feedback and
I'll see if I can bake it into a pull request.



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Kafka-Component-tp5749525p5756455.html
Sent from the Camel Development mailing list archive at Nabble.com.