You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/09/23 18:43:02 UTC
[jira] [Commented] (KAFKA-946) Kafka Hadoop Consumer fails when
verifying message checksum
[ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13774707#comment-13774707 ]
Jun Rao commented on KAFKA-946:
-------------------------------
Sorry for not looking at this earlier. I think that we can include the fix in 0.8 since it's simple enough. It doesn't apply to 0.8 though. Could you rebase?
> Kafka Hadoop Consumer fails when verifying message checksum
> -----------------------------------------------------------
>
> Key: KAFKA-946
> URL: https://issues.apache.org/jira/browse/KAFKA-946
> Project: Kafka
> Issue Type: Bug
> Components: contrib
> Affects Versions: 0.8
> Reporter: Sam Meder
> Priority: Critical
> Fix For: 0.8
>
> Attachments: hadoop_consumer.patch
>
>
> The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext:
> protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
> if (_messageIt != null && _messageIt.hasNext()) {
> MessageAndOffset messageAndOffset = _messageIt.next();
> ByteBuffer buf = messageAndOffset.message().payload();
> int origSize = buf.remaining();
> byte[] bytes = new byte[origSize];
> buf.get(bytes, buf.position(), origSize);
> value.set(bytes, 0, origSize);
> key.set(_index, _offset, messageAndOffset.message().checksum());
> _offset = messageAndOffset.nextOffset(); //increase offset
> _count ++; //increase count
> return true;
> }
> else return false;
> }
> Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper:
> @Override
> public void map(KafkaETLKey key, BytesWritable val,
> OutputCollector<LongWritable, Text> collector,
> Reporter reporter) throws IOException {
> byte[] bytes = KafkaETLUtils.getBytes(val);
> //check the checksum of message
> Message message = new Message(bytes);
> long checksum = key.getChecksum();
> if (checksum != message.checksum())
> throw new IOException ("Invalid message checksum "
> + message.checksum() + ". Expected " + key + ".");
> the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails...
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira