You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Emmanuel <el...@msn.com> on 2015/03/19 20:58:23 UTC

Kafka Bolt - pushing byte array

Hello
I'm reading data from Kafka formatted as a Protobuf object (it comes out as a byte[] )
This works fine and I can read / decode the data, but trying to push back to the queue, when declaring the Kafka Bolt without any type specifics, it seems to require a String object that then gets encoded.KafkaBolt kafkaBolt = new KafkaBolt()
        .withTopicSelector(new DefaultTopicSelector("topic"))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

If I try to apply typing to the constructor like this:KafkaBolt kafkaBolt = new KafkaBolt<String, byte[]>()
        .withTopicSelector(new DefaultTopicSelector("topic"))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, byte[]>());It still seems to expect a String to decode.I tried to encode as a string in the previous bolt with:byte[] byteArray = input.getBinary(0);
String output;
try {
    output = new String(byteArray, "ISO-8859-1");
    _collector.emit(new Values(this.topic, output, "key"));
    
} catch (...){}and now the KafkaBolt is able to send to the queue, but it fails on some objects, and the ProtoBuf decoder o nthe other side spits out lots of errors.I used ISO-8859-1 as I read it is a 1-1 mapping from binary, but I have no idea what encoding is used by the KafkaBolt on the other side and it certainly can be the problem.
Ideally I want to avoid the String encoding / decoding, so how do I specify the type of the 'message' to the KafkaBolt?
What am I doing wrong?Thanks for help.
Regards
 		 	   		  

RE: Kafka Bolt - pushing byte array

Posted by Emmanuel <el...@msn.com>.
I figured my mistake:
the 
props.put("serializer.class", "kafka.serializer.DefaultEncoder");

was set to StringEncoder instead of the DefaultEncoder which takes a byte[]However the key also needs to be a byte[] and i had to change that too.
From: eleroy@msn.com
To: user@storm.apache.org
Subject: Kafka Bolt - pushing byte array
Date: Thu, 19 Mar 2015 19:58:23 +0000




Hello
I'm reading data from Kafka formatted as a Protobuf object (it comes out as a byte[] )
This works fine and I can read / decode the data, but trying to push back to the queue, when declaring the Kafka Bolt without any type specifics, it seems to require a String object that then gets encoded.KafkaBolt kafkaBolt = new KafkaBolt()
        .withTopicSelector(new DefaultTopicSelector("topic"))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

If I try to apply typing to the constructor like this:KafkaBolt kafkaBolt = new KafkaBolt<String, byte[]>()
        .withTopicSelector(new DefaultTopicSelector("topic"))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, byte[]>());It still seems to expect a String to decode.I tried to encode as a string in the previous bolt with:byte[] byteArray = input.getBinary(0);
String output;
try {
    output = new String(byteArray, "ISO-8859-1");
    _collector.emit(new Values(this.topic, output, "key"));
    
} catch (...){}and now the KafkaBolt is able to send to the queue, but it fails on some objects, and the ProtoBuf decoder o nthe other side spits out lots of errors.I used ISO-8859-1 as I read it is a 1-1 mapping from binary, but I have no idea what encoding is used by the KafkaBolt on the other side and it certainly can be the problem.
Ideally I want to avoid the String encoding / decoding, so how do I specify the type of the 'message' to the KafkaBolt?
What am I doing wrong?Thanks for help.
Regards