You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/09/24 17:34:02 UTC

[jira] [Closed] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum

     [ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao closed KAFKA-946.
-------------------------

    
> Kafka Hadoop Consumer fails when verifying message checksum
> -----------------------------------------------------------
>
>                 Key: KAFKA-946
>                 URL: https://issues.apache.org/jira/browse/KAFKA-946
>             Project: Kafka
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Sam Meder
>            Priority: Critical
>             Fix For: 0.8
>
>         Attachments: hadoop_consumer_1.patch
>
>
> The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext:
>     protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
> 	if (_messageIt != null && _messageIt.hasNext()) {
>             MessageAndOffset messageAndOffset = _messageIt.next();
>             ByteBuffer buf = messageAndOffset.message().payload();
>             int origSize = buf.remaining();
>             byte[] bytes = new byte[origSize];
>           buf.get(bytes, buf.position(), origSize);
>             value.set(bytes, 0, origSize);
>             key.set(_index, _offset, messageAndOffset.message().checksum());
>             _offset = messageAndOffset.nextOffset();  //increase offset                                                                                                                                  
>             _count ++;  //increase count                                                                                                                                                                 
>             return true;
>         }
>         else return false;
>     }
> Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper:
>     @Override
>     public void map(KafkaETLKey key, BytesWritable val,
>             OutputCollector<LongWritable, Text> collector,
>             Reporter reporter) throws IOException {
> 	byte[] bytes = KafkaETLUtils.getBytes(val);
>         //check the checksum of message                                                                                                                                                                  
>         Message message = new Message(bytes);
>         long checksum = key.getChecksum();
> 	if (checksum != message.checksum())
>             throw new IOException ("Invalid message checksum "
>                                             + message.checksum() + ". Expected " + key + ".");
> the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira