You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sam Meder (JIRA)" <ji...@apache.org> on 2013/06/18 20:30:20 UTC

[jira] [Created] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum

Sam Meder created KAFKA-946:
-------------------------------

             Summary: Kafka Hadoop Consumer fails when verifying message checksum
                 Key: KAFKA-946
                 URL: https://issues.apache.org/jira/browse/KAFKA-946
             Project: Kafka
          Issue Type: Bug
          Components: contrib
    Affects Versions: 0.8
            Reporter: Sam Meder
            Priority: Critical


The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext:

    protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
	if (_messageIt != null && _messageIt.hasNext()) {
            MessageAndOffset messageAndOffset = _messageIt.next();

            ByteBuffer buf = messageAndOffset.message().payload();
            int origSize = buf.remaining();
            byte[] bytes = new byte[origSize];
          buf.get(bytes, buf.position(), origSize);
            value.set(bytes, 0, origSize);

            key.set(_index, _offset, messageAndOffset.message().checksum());

            _offset = messageAndOffset.nextOffset();  //increase offset                                                                                                                                  
            _count ++;  //increase count                                                                                                                                                                 

            return true;
        }
        else return false;
    }

Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper:

    @Override
    public void map(KafkaETLKey key, BytesWritable val,
            OutputCollector<LongWritable, Text> collector,
            Reporter reporter) throws IOException {


	byte[] bytes = KafkaETLUtils.getBytes(val);

        //check the checksum of message                                                                                                                                                                  
        Message message = new Message(bytes);
        long checksum = key.getChecksum();
	if (checksum != message.checksum())
            throw new IOException ("Invalid message checksum "
                                            + message.checksum() + ". Expected " + key + ".");

the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails...


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira