You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/26 23:43:41 UTC

kafka git commit: KAFKA-527; Use in-place decompression enabled inner iterator to replace old decompress function; reviewed by Joel Koshy and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 5b42b538e -> d2f50fc38


KAFKA-527; Use in-place decompression enabled inner iterator to replace old decompress function; reviewed by Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2f50fc3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2f50fc3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2f50fc3

Branch: refs/heads/trunk
Commit: d2f50fc3886896bc569fea7fb308036008b89f94
Parents: 5b42b53
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 26 15:43:18 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 26 15:43:18 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsumerIterator.scala |  2 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  2 +-
 .../kafka/message/ByteBufferMessageSet.scala    | 62 ++++++++++++++------
 .../scala/kafka/tools/DumpLogSegments.scala     |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 10 ++--
 .../unit/kafka/producer/SyncProducerTest.scala  |  2 +-
 6 files changed, 52 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 78fbf75..b00a4dc 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -37,7 +37,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
                              val clientId: String)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
-  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
+  private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
   private var currentTopicInfo: PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
   private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index ac96434..0256764 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -182,7 +182,7 @@ class LogSegment(val log: FileMessageSet,
               case NoCompressionCodec =>
                 entry.offset
               case _ =>
-                ByteBufferMessageSet.decompress(entry.message).head.offset
+                ByteBufferMessageSet.deepIterator(entry.message).next().offset
           }
           index.append(startOffset, validBytes)
           lastIndexEntry = validBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 2d6cfc0..9dfe914 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -17,12 +17,13 @@
 
 package kafka.message
 
-import kafka.utils.Logging
+import kafka.utils.{IteratorTemplate, Logging}
+import kafka.common.KafkaException
+
 import java.nio.ByteBuffer
 import java.nio.channels._
-import java.io.{InputStream, DataOutputStream}
+import java.io._
 import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.IteratorTemplate
 
 object ByteBufferMessageSet {
 
@@ -58,19 +59,42 @@ object ByteBufferMessageSet {
     }
   }
 
-  def decompress(message: Message): ByteBufferMessageSet = {
-    val outputStream = new BufferingOutputStream(math.min(math.max(message.size, 1024), 1 << 16))
-    val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload)
-    val compressed = CompressionFactory(message.compressionCodec, inputStream)
-    try {
-      outputStream.write(compressed)
-    } finally {
-      compressed.close()
+  /** Deep iterator that decompresses the message sets in-place. */
+  def deepIterator(wrapperMessage: Message): Iterator[MessageAndOffset] = {
+    new IteratorTemplate[MessageAndOffset] {
+
+      val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
+      val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream))
+
+      override def makeNext(): MessageAndOffset = {
+        try {
+          // read the offset
+          val offset = compressed.readLong()
+          // read record size
+          val size = compressed.readInt()
+
+          if (size < Message.MinHeaderSize)
+            throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")
+
+          // read the record into an intermediate record buffer
+          // and hence has to do extra copy
+          val bufferArray = new Array[Byte](size)
+          compressed.readFully(bufferArray, 0, size)
+          val buffer = ByteBuffer.wrap(bufferArray)
+
+          val newMessage = new Message(buffer)
+
+          // the decompressed message should not be a wrapper message since we do not allow nested compression
+          new MessageAndOffset(newMessage, offset)
+        } catch {
+          case eofe: EOFException =>
+            compressed.close()
+            allDone()
+          case ioe: IOException =>
+            throw new KafkaException(ioe)
+        }
+      }
     }
-    val outputBuffer = ByteBuffer.allocate(outputStream.size)
-    outputStream.writeTo(outputBuffer)
-    outputBuffer.rewind
-    new ByteBufferMessageSet(outputBuffer)
   }
 
   private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
@@ -150,7 +174,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       var topIter = buffer.slice()
       var innerIter: Iterator[MessageAndOffset] = null
 
-      def innerDone():Boolean = (innerIter == null || !innerIter.hasNext)
+      def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext)
 
       def makeNextOuter: MessageAndOffset = {
         // if there isn't at least an offset and size, we are done
@@ -159,7 +183,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
         val offset = topIter.getLong()
         val size = topIter.getInt()
         if(size < Message.MinHeaderSize)
-          throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
+          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")
 
         // we have an incomplete message
         if(topIter.remaining < size)
@@ -179,7 +203,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
               innerIter = null
               new MessageAndOffset(newMessage, offset)
             case _ =>
-              innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator()
+              innerIter = ByteBufferMessageSet.deepIterator(newMessage)
               if(!innerIter.hasNext)
                 innerIter = null
               makeNext()
@@ -194,7 +218,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           if(innerDone())
             makeNextOuter
           else
-            innerIter.next
+            innerIter.next()
         }
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fe2cc11..b7a3630 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -180,7 +180,7 @@ object DumpLogSegments {
         case NoCompressionCodec =>
           getSingleMessageIterator(messageAndOffset)
         case _ =>
-          ByteBufferMessageSet.decompress(message).iterator
+          ByteBufferMessageSet.deepIterator(message)
       }
     } else
       getSingleMessageIterator(messageAndOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8cd5f2f..3c0599c 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -269,13 +269,13 @@ class LogTest extends JUnitSuite {
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
 
-    def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message)
+    def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head.message)
 
     /* we should always get the first message in the compressed set when reading any offset in the set */
-    assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
-    assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
-    assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
-    assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
+    assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
+    assertEquals("Read at offset 1 should produce 0", 0, read(1).next().offset)
+    assertEquals("Read at offset 2 should produce 2", 2, read(2).next().offset)
+    assertEquals("Read at offset 3 should produce 2", 2, read(3).next().offset)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b5208a5..812df59 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -31,7 +31,7 @@ import kafka.api.ProducerResponseStatus
 import kafka.common.{TopicAndPartition, ErrorMapping}
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
-  private var messageBytes =  new Array[Byte](2);
+  private val messageBytes =  new Array[Byte](2)
   // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
   val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head))
   val zookeeperConnect = TestZKUtils.zookeeperConnect