You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/28 06:56:19 UTC

git commit: kafka-1140; Move the decoding logic from ConsumerIterator.makeNext to next; patched by Guozhang Wang; reviewed by Jun Rao

Updated Branches:
  refs/heads/trunk ac239da50 -> df288b75a


kafka-1140; Move the decoding logic from ConsumerIterator.makeNext to next; patched by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df288b75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df288b75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df288b75

Branch: refs/heads/trunk
Commit: df288b75a0c6685deeda99ce4db3e17fff39b0ad
Parents: ac239da
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Wed Nov 27 21:56:55 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 27 21:56:55 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsumerIterator.scala |  5 +--
 .../kafka/message/MessageAndMetadata.scala      | 15 +++++++-
 .../kafka/consumer/ConsumerIteratorTest.scala   | 36 ++++++++++++++++++++
 3 files changed, 51 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/df288b75/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index a4227a4..ac491b4 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -101,10 +101,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
 
     item.message.ensureValid() // validate checksum of message to ensure it is valid
 
-    val keyBuffer = item.message.key
-    val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
-    val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
-    new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
+    new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
   }
 
   def clearCurrentChunk() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/df288b75/core/src/main/scala/kafka/message/MessageAndMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
index 20c0e70..d693abc 100644
--- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala
+++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
@@ -17,5 +17,18 @@
 
 package kafka.message
 
-case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long)
+import kafka.serializer.Decoder
+import kafka.utils.Utils
+
+case class MessageAndMetadata[K, V](topic: String, partition: Int,
+                                    private val rawMessage: Message, offset: Long,
+                                    keyDecoder: Decoder[K], valueDecoder: Decoder[V]) {
+
+  /**
+   * Return the decoded message key and payload
+   */
+  def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key))
+
+  def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload))
+}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/df288b75/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index ef1de83..9347ea6 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -88,4 +88,40 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
     val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload))
     assertEquals(unconsumed, receivedMessages)
   }
+
+  @Test
+  def testConsumerIteratorDecodingFailure() {
+    val messageStrings = (0 until 10).map(_.toString).toList
+    val messages = messageStrings.map(s => new Message(s.getBytes))
+    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*)
+
+    topicInfos(0).enqueue(messageSet)
+    assertEquals(1, queue.size)
+
+    val iter = new ConsumerIterator[String, String](queue,
+      ConsumerConfig.ConsumerTimeoutMs,
+      new FailDecoder(),
+      new FailDecoder(),
+      clientId = "")
+
+    val receivedMessages = (0 until 5).map{ i =>
+      assertTrue(iter.hasNext)
+      val message = iter.next
+      assertEquals(message.offset, i + consumedOffset)
+
+      try {
+        message.message // should fail
+      }
+      catch {
+        case e: UnsupportedOperationException => // this is ok
+        case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage)
+      }
+    }
+  }
+
+  class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] {
+    def fromBytes(bytes: Array[Byte]): String = {
+      throw new UnsupportedOperationException("This decoder does not work at all..")
+    }
+  }
 }