You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/26 23:43:41 UTC
kafka git commit: KAFKA-527;
Use in-place decompression enabled inner iterator to replace old
decompress function; reviewed by Joel Koshy and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 5b42b538e -> d2f50fc38
KAFKA-527; Use in-place decompression enabled inner iterator to replace old decompress function; reviewed by Joel Koshy and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2f50fc3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2f50fc3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2f50fc3
Branch: refs/heads/trunk
Commit: d2f50fc3886896bc569fea7fb308036008b89f94
Parents: 5b42b53
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 26 15:43:18 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 26 15:43:18 2015 -0700
----------------------------------------------------------------------
.../scala/kafka/consumer/ConsumerIterator.scala | 2 +-
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
.../kafka/message/ByteBufferMessageSet.scala | 62 ++++++++++++++------
.../scala/kafka/tools/DumpLogSegments.scala | 2 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 10 ++--
.../unit/kafka/producer/SyncProducerTest.scala | 2 +-
6 files changed, 52 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 78fbf75..b00a4dc 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -37,7 +37,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
- private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
+ private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index ac96434..0256764 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -182,7 +182,7 @@ class LogSegment(val log: FileMessageSet,
case NoCompressionCodec =>
entry.offset
case _ =>
- ByteBufferMessageSet.decompress(entry.message).head.offset
+ ByteBufferMessageSet.deepIterator(entry.message).next().offset
}
index.append(startOffset, validBytes)
lastIndexEntry = validBytes
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 2d6cfc0..9dfe914 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -17,12 +17,13 @@
package kafka.message
-import kafka.utils.Logging
+import kafka.utils.{IteratorTemplate, Logging}
+import kafka.common.KafkaException
+
import java.nio.ByteBuffer
import java.nio.channels._
-import java.io.{InputStream, DataOutputStream}
+import java.io._
import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.IteratorTemplate
object ByteBufferMessageSet {
@@ -58,19 +59,42 @@ object ByteBufferMessageSet {
}
}
- def decompress(message: Message): ByteBufferMessageSet = {
- val outputStream = new BufferingOutputStream(math.min(math.max(message.size, 1024), 1 << 16))
- val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload)
- val compressed = CompressionFactory(message.compressionCodec, inputStream)
- try {
- outputStream.write(compressed)
- } finally {
- compressed.close()
+ /** Deep iterator that decompresses the message sets in-place. */
+ def deepIterator(wrapperMessage: Message): Iterator[MessageAndOffset] = {
+ new IteratorTemplate[MessageAndOffset] {
+
+ val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
+ val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream))
+
+ override def makeNext(): MessageAndOffset = {
+ try {
+ // read the offset
+ val offset = compressed.readLong()
+ // read record size
+ val size = compressed.readInt()
+
+ if (size < Message.MinHeaderSize)
+ throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")
+
+ // read the record into an intermediate record buffer
+ // and hence has to do extra copy
+ val bufferArray = new Array[Byte](size)
+ compressed.readFully(bufferArray, 0, size)
+ val buffer = ByteBuffer.wrap(bufferArray)
+
+ val newMessage = new Message(buffer)
+
+ // the decompressed message should not be a wrapper message since we do not allow nested compression
+ new MessageAndOffset(newMessage, offset)
+ } catch {
+ case eofe: EOFException =>
+ compressed.close()
+ allDone()
+ case ioe: IOException =>
+ throw new KafkaException(ioe)
+ }
+ }
}
- val outputBuffer = ByteBuffer.allocate(outputStream.size)
- outputStream.writeTo(outputBuffer)
- outputBuffer.rewind
- new ByteBufferMessageSet(outputBuffer)
}
private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
@@ -150,7 +174,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
var topIter = buffer.slice()
var innerIter: Iterator[MessageAndOffset] = null
- def innerDone():Boolean = (innerIter == null || !innerIter.hasNext)
+ def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext)
def makeNextOuter: MessageAndOffset = {
// if there isn't at least an offset and size, we are done
@@ -159,7 +183,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
val offset = topIter.getLong()
val size = topIter.getInt()
if(size < Message.MinHeaderSize)
- throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
+ throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")
// we have an incomplete message
if(topIter.remaining < size)
@@ -179,7 +203,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
innerIter = null
new MessageAndOffset(newMessage, offset)
case _ =>
- innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator()
+ innerIter = ByteBufferMessageSet.deepIterator(newMessage)
if(!innerIter.hasNext)
innerIter = null
makeNext()
@@ -194,7 +218,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
if(innerDone())
makeNextOuter
else
- innerIter.next
+ innerIter.next()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fe2cc11..b7a3630 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -180,7 +180,7 @@ object DumpLogSegments {
case NoCompressionCodec =>
getSingleMessageIterator(messageAndOffset)
case _ =>
- ByteBufferMessageSet.decompress(message).iterator
+ ByteBufferMessageSet.deepIterator(message)
}
} else
getSingleMessageIterator(messageAndOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8cd5f2f..3c0599c 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -269,13 +269,13 @@ class LogTest extends JUnitSuite {
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
- def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message)
+ def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head.message)
/* we should always get the first message in the compressed set when reading any offset in the set */
- assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
- assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
- assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
- assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
+ assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
+ assertEquals("Read at offset 1 should produce 0", 0, read(1).next().offset)
+ assertEquals("Read at offset 2 should produce 2", 2, read(2).next().offset)
+ assertEquals("Read at offset 3 should produce 2", 2, read(3).next().offset)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b5208a5..812df59 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -31,7 +31,7 @@ import kafka.api.ProducerResponseStatus
import kafka.common.{TopicAndPartition, ErrorMapping}
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
- private var messageBytes = new Array[Byte](2);
+ private val messageBytes = new Array[Byte](2)
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head))
val zookeeperConnect = TestZKUtils.zookeeperConnect