You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/24 17:31:26 UTC

git commit: kafka-946; Kafka Hadoop Consumer fails when verifying message checksum; patched by Sam Meder; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 61b8b2bf8 -> bb7b45cd5


kafka-946; Kafka Hadoop Consumer fails when verifying message checksum; patched by Sam Meder; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb7b45cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb7b45cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb7b45cd

Branch: refs/heads/0.8
Commit: bb7b45cd5d454d64c3454b01f8f3f1e13ed26ff3
Parents: 61b8b2b
Author: Sam Meder <sa...@gmail.com>
Authored: Tue Sep 24 08:31:18 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Sep 24 08:31:18 2013 -0700

----------------------------------------------------------------------
 .../src/main/java/kafka/etl/KafkaETLContext.java              | 2 +-
 .../src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java    | 7 ++++---
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bb7b45cd/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
index 8e98efc..1d0e0a9 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -196,7 +196,7 @@ public class KafkaETLContext {
         if (_messageIt != null && _messageIt.hasNext()) {
             MessageAndOffset messageAndOffset = _messageIt.next();
             
-            ByteBuffer buf = messageAndOffset.message().payload();
+            ByteBuffer buf = messageAndOffset.message().buffer();
             int origSize = buf.remaining();
             byte[] bytes = new byte[origSize];
           buf.get(bytes, buf.position(), origSize);

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb7b45cd/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
index b0aadff..45cc921 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
@@ -16,8 +16,6 @@
  */
 package kafka.etl.impl;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLUtils;
 import kafka.message.Message;
@@ -29,6 +27,9 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 /**
  * Simple implementation of KafkaETLMapper. It assumes that 
  * input data are text timestamp (long).
@@ -59,7 +60,7 @@ Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> {
         byte[] bytes = KafkaETLUtils.getBytes(val);
         
         //check the checksum of message
-        Message message = new Message(bytes);
+        Message message = new Message(ByteBuffer.wrap(bytes));
         long checksum = key.getChecksum();
         if (checksum != message.checksum()) 
             throw new IOException ("Invalid message checksum "