You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sam Meder (JIRA)" <ji...@apache.org> on 2013/06/18 20:30:20 UTC
[jira] [Created] (KAFKA-946) Kafka Hadoop Consumer fails when
verifying message checksum
Sam Meder created KAFKA-946:
-------------------------------
Summary: Kafka Hadoop Consumer fails when verifying message checksum
Key: KAFKA-946
URL: https://issues.apache.org/jira/browse/KAFKA-946
Project: Kafka
Issue Type: Bug
Components: contrib
Affects Versions: 0.8
Reporter: Sam Meder
Priority: Critical
The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext:
protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
if (_messageIt != null && _messageIt.hasNext()) {
MessageAndOffset messageAndOffset = _messageIt.next();
ByteBuffer buf = messageAndOffset.message().payload();
int origSize = buf.remaining();
byte[] bytes = new byte[origSize];
buf.get(bytes, buf.position(), origSize);
value.set(bytes, 0, origSize);
key.set(_index, _offset, messageAndOffset.message().checksum());
_offset = messageAndOffset.nextOffset(); //increase offset
_count ++; //increase count
return true;
}
else return false;
}
Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper:
@Override
public void map(KafkaETLKey key, BytesWritable val,
OutputCollector<LongWritable, Text> collector,
Reporter reporter) throws IOException {
byte[] bytes = KafkaETLUtils.getBytes(val);
//check the checksum of message
Message message = new Message(bytes);
long checksum = key.getChecksum();
if (checksum != message.checksum())
throw new IOException ("Invalid message checksum "
+ message.checksum() + ". Expected " + key + ".");
the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails...
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira