You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/28 06:56:19 UTC
git commit: kafka-1140;
Move the decoding logic from ConsumerIterator.makeNext to next;
patched by Guozhang Wang; reviewed by Jun Rao
Updated Branches:
refs/heads/trunk ac239da50 -> df288b75a
kafka-1140; Move the decoding logic from ConsumerIterator.makeNext to next; patched by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df288b75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df288b75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df288b75
Branch: refs/heads/trunk
Commit: df288b75a0c6685deeda99ce4db3e17fff39b0ad
Parents: ac239da
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Wed Nov 27 21:56:55 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 27 21:56:55 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/consumer/ConsumerIterator.scala | 5 +--
.../kafka/message/MessageAndMetadata.scala | 15 +++++++-
.../kafka/consumer/ConsumerIteratorTest.scala | 36 ++++++++++++++++++++
3 files changed, 51 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/df288b75/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index a4227a4..ac491b4 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -101,10 +101,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
item.message.ensureValid() // validate checksum of message to ensure it is valid
- val keyBuffer = item.message.key
- val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
- val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
- new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
+ new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
}
def clearCurrentChunk() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/df288b75/core/src/main/scala/kafka/message/MessageAndMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
index 20c0e70..d693abc 100644
--- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala
+++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
@@ -17,5 +17,18 @@
package kafka.message
-case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long)
+import kafka.serializer.Decoder
+import kafka.utils.Utils
+
+case class MessageAndMetadata[K, V](topic: String, partition: Int,
+ private val rawMessage: Message, offset: Long,
+ keyDecoder: Decoder[K], valueDecoder: Decoder[V]) {
+
+ /**
+ * Return the decoded message key and payload
+ */
+ def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key))
+
+ def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload))
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/df288b75/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index ef1de83..9347ea6 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -88,4 +88,40 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload))
assertEquals(unconsumed, receivedMessages)
}
+
+ @Test
+ def testConsumerIteratorDecodingFailure() {
+ val messageStrings = (0 until 10).map(_.toString).toList
+ val messages = messageStrings.map(s => new Message(s.getBytes))
+ val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*)
+
+ topicInfos(0).enqueue(messageSet)
+ assertEquals(1, queue.size)
+
+ val iter = new ConsumerIterator[String, String](queue,
+ ConsumerConfig.ConsumerTimeoutMs,
+ new FailDecoder(),
+ new FailDecoder(),
+ clientId = "")
+
+ val receivedMessages = (0 until 5).map{ i =>
+ assertTrue(iter.hasNext)
+ val message = iter.next
+ assertEquals(message.offset, i + consumedOffset)
+
+ try {
+ message.message // should fail
+ }
+ catch {
+ case e: UnsupportedOperationException => // this is ok
+ case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage)
+ }
+ }
+ }
+
+ class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] {
+ def fromBytes(bytes: Array[Byte]): String = {
+ throw new UnsupportedOperationException("This decoder does not work at all..")
+ }
+ }
}