You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:30 UTC
[3/9] kafka git commit: KAFKA-4390;
Replace MessageSet usage with client-side alternatives
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 250c8b8..65c2d05 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -22,10 +22,9 @@ import java.util.Properties
import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
import kafka.common.TopicAndPartition
-import kafka.message._
import kafka.server.OffsetCheckpoint
import kafka.utils._
-import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit._
@@ -43,7 +42,7 @@ import scala.util.Random
@RunWith(value = classOf[Parameterized])
class LogCleanerIntegrationTest(compressionCodec: String) {
- val codec = CompressionCodec.getCompressionCodec(compressionCodec)
+ val codec = CompressionType.forName(compressionCodec)
val time = new MockTime()
val segmentSize = 256
val deleteDelay = 1000
@@ -56,7 +55,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@Test
def cleanerTest() {
val largeMessageKey = 20
- val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V1)
+ val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V1)
val maxMessageSize = largeMessageSet.sizeInBytes
cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
@@ -133,13 +132,13 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
// returns (value, ByteBufferMessageSet)
- private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, ByteBufferMessageSet) = {
+ private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = {
def messageValue(length: Int): String = {
val random = new Random(0)
new String(random.alphanumeric.take(length).toArray)
}
val value = messageValue(128)
- val messageSet = TestUtils.singleMessageSet(payload = value.getBytes, codec = codec, key = key.toString.getBytes,
+ val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
magicValue = messageFormatVersion)
(value, messageSet)
}
@@ -147,9 +146,9 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@Test
def testCleanerWithMessageFormatV0(): Unit = {
val largeMessageKey = 20
- val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V0)
+ val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V0)
val maxMessageSize = codec match {
- case NoCompressionCodec => largeMessageSet.sizeInBytes
+ case CompressionType.NONE => largeMessageSet.sizeInBytes
case _ =>
// the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
// increase because the broker offsets are larger than the ones assigned by the client
@@ -165,7 +164,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
log.config = new LogConfig(props)
- val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
+ val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
val startSize = log.size
cleaner.startup()
@@ -177,14 +176,14 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
checkLogAfterAppendingDups(log, startSize, appends)
val appends2: Seq[(Int, String, Long)] = {
- val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
+ val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
val appendInfo = log.append(largeMessageSet, assignOffsets = true)
val largeMessageOffset = appendInfo.firstOffset
// also add some messages with version 1 to check that we handle mixed format versions correctly
props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
log.config = new LogConfig(props)
- val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V1)
+ val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1
}
val firstDirty2 = log.activeSegment.baseOffset
@@ -205,15 +204,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
// with compression enabled, these messages will be written as a single message containing
// all of the individual messages
- var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
- appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V0)
+ var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+ appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
log.config = new LogConfig(props)
- var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
- appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
- appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
+ var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
+ appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
+ appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
val appends = appendsV0 ++ appendsV1
@@ -250,32 +249,27 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
-
- def messageIterator(entry: MessageAndOffset): Iterator[MessageAndOffset] =
- // create single message iterator or deep iterator depending on compression codec
- if (entry.message.compressionCodec == NoCompressionCodec) Iterator(entry)
- else ByteBufferMessageSet.deepIterator(entry)
-
- for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- messageIterator(entry)) yield {
- val key = TestUtils.readString(messageAndOffset.message.key).toInt
- val value = TestUtils.readString(messageAndOffset.message.payload)
- (key, value, messageAndOffset.offset)
+ import JavaConverters._
+ for (segment <- log.logSegments; deepLogEntry <- segment.log.deepIterator.asScala) yield {
+ val key = TestUtils.readString(deepLogEntry.record.key).toInt
+ val value = TestUtils.readString(deepLogEntry.record.value)
+ (key, value, deepLogEntry.offset)
}
}
- private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
- startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
+ private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+ startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
- val payload = counter.toString
- val appendInfo = log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec = codec,
+ val value = counter.toString
+ val appendInfo = log.append(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
key = key.toString.getBytes, magicValue = magicValue), assignOffsets = true)
counter += 1
- (key, payload, appendInfo.firstOffset)
+ (key, value, appendInfo.firstOffset)
}
}
- private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
- startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
+ private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+ startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val payload = counter.toString
counter += 1
@@ -283,11 +277,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
val messages = kvs.map { case (key, payload) =>
- new Message(payload.toString.getBytes, key.toString.getBytes, Message.NoTimestamp, magicValue)
+ Record.create(magicValue, key.toString.getBytes, payload.toString.getBytes)
}
- val messageSet = new ByteBufferMessageSet(compressionCodec = codec, messages: _*)
- val appendInfo = log.append(messageSet, assignOffsets = true)
+ val records = MemoryRecords.withRecords(codec, messages: _*)
+ val appendInfo = log.append(records, assignOffsets = true)
val offsets = appendInfo.firstOffset to appendInfo.lastOffset
kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 5e029fc..abab3bf 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.util.Properties
import kafka.common.TopicAndPartition
-import kafka.message._
import kafka.utils._
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils
@@ -33,7 +32,6 @@ import org.junit.runners.Parameterized.Parameters
import scala.collection._
-
/**
* This is an integration test that tests the fully integrated log cleaner
*/
@@ -52,7 +50,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
val logDir = TestUtils.tempDir()
var counter = 0
val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
- val compressionCodec = CompressionCodec.getCompressionCodec(compressionCodecName)
+ val compressionCodec = CompressionType.forName(compressionCodecName)
@Test
def cleanerTest(): Unit = {
@@ -96,7 +94,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize")
- val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
+ val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition("log", 0))
assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset)
assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize",
sizeUpToActiveSegmentAtT0 > compactedSize)
@@ -106,23 +104,19 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
}
private def readFromLog(log: Log): Iterable[(Int, Int)] = {
- for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
- // create single message iterator or deep iterator depending on compression codec
- if (entry.message.compressionCodec == NoCompressionCodec)
- Stream.cons(entry, Stream.empty).iterator
- else
- ByteBufferMessageSet.deepIterator(entry)
- }) yield {
- val key = TestUtils.readString(messageAndOffset.message.key).toInt
- val value = TestUtils.readString(messageAndOffset.message.payload).toInt
+ import JavaConverters._
+
+ for (segment <- log.logSegments; logEntry <- segment.log.deepIterator.asScala) yield {
+ val key = TestUtils.readString(logEntry.record.key).toInt
+ val value = TestUtils.readString(logEntry.record.value).toInt
key -> value
}
}
- private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
+ private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long): Seq[(Int, Int)] = {
for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
- log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+ log.append(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
counter += 1
(key, count)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 0cd52d6..5dfa268 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -21,8 +21,8 @@ import java.io.File
import java.util.Properties
import kafka.common._
-import kafka.message._
import kafka.utils._
+import org.apache.kafka.common.record.{MemoryRecords, Record}
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Test}
@@ -54,8 +54,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
*/
@Test
def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs(): Unit = {
- val messageSet = TestUtils.singleMessageSet("test".getBytes)
- val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Delete)
+ val records = TestUtils.singletonRecords("test".getBytes)
+ val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val readyToDelete = cleanerManager.deletableLogs().size
@@ -67,8 +67,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
*/
@Test
def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs(): Unit = {
- val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes)
- val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
+ val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+ val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val readyToDelete = cleanerManager.deletableLogs().size
@@ -81,8 +81,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
*/
@Test
def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): Unit = {
- val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes)
- val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact)
+ val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+ val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val readyToDelete = cleanerManager.deletableLogs().size
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while(log.numberOfSegments < 8)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
+ log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
val topicAndPartition = TopicAndPartition("log", 0)
val lastClean = Map(topicAndPartition-> 0L)
@@ -123,7 +123,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val t0 = time.milliseconds
while(log.numberOfSegments < 4)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+ log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
val activeSegAtT0 = log.activeSegment
@@ -131,7 +131,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val t1 = time.milliseconds
while (log.numberOfSegments < 8)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
+ log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
val topicAndPartition = TopicAndPartition("log", 0)
val lastClean = Map(topicAndPartition-> 0L)
@@ -155,7 +155,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val t0 = time.milliseconds
while (log.numberOfSegments < 8)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+ log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
time.sleep(compactionLag + 1)
@@ -192,10 +192,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
- private def message(key: Int, value: Int, timestamp: Long) =
- new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
- bytes = value.toString.getBytes,
- timestamp = timestamp,
- magicValue = Message.MagicValue_V1))
+ private def logEntries(key: Int, value: Int, timestamp: Long) =
+ MemoryRecords.withRecords(Record.create(timestamp, key.toString.getBytes, value.toString.getBytes))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index d80fba1..a99d4b9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -17,21 +17,21 @@
package kafka.log
-import java.io.{DataOutputStream, File}
+import java.io.File
import java.nio._
import java.nio.file.Paths
import java.util.Properties
import kafka.common._
-import kafka.message._
import kafka.utils._
-import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
import scala.collection._
+import JavaConverters._
/**
* Unit tests for the log cleaning logic
@@ -66,7 +66,7 @@ class LogCleanerTest extends JUnitSuite {
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
val keysFound = keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
@@ -100,7 +100,7 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while(log.numberOfSegments < 2)
- log.append(message(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)))
+ log.append(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)))
val keysFound = keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
@@ -123,23 +123,23 @@ class LogCleanerTest extends JUnitSuite {
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
-
+
// append messages with the keys 0 through N
while(log.numberOfSegments < 2)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
// delete all even keys between 0 and N
val leo = log.logEndOffset
for(key <- 0 until leo.toInt by 2)
- log.append(deleteMessage(key))
-
+ log.append(tombstoneRecord(key))
+
// append some new unique keys to pad out to a new active segment
while(log.numberOfSegments < 4)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
val keys = keysInLog(log).toSet
- assertTrue("None of the keys we deleted should still exist.",
+ assertTrue("None of the keys we deleted should still exist.",
(0 until leo.toInt by 2).forall(!keys.contains(_)))
}
@@ -151,11 +151,11 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
- log.append(message(0,0)) // offset 0
- log.append(message(1,1)) // offset 1
- log.append(message(0,0)) // offset 2
- log.append(message(1,1)) // offset 3
- log.append(message(0,0)) // offset 4
+ log.append(record(0,0)) // offset 0
+ log.append(record(1,1)) // offset 1
+ log.append(record(0,0)) // offset 2
+ log.append(record(1,1)) // offset 3
+ log.append(record(0,0)) // offset 4
// roll the segment, so we can clean the messages already appended
log.roll()
@@ -180,11 +180,11 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
- log.append(message(0,0)) // offset 0
- log.append(message(1,1)) // offset 1
- log.append(message(0,0)) // offset 2
- log.append(message(1,1)) // offset 3
- log.append(message(0,0)) // offset 4
+ log.append(record(0,0)) // offset 0
+ log.append(record(1,1)) // offset 1
+ log.append(record(0,0)) // offset 2
+ log.append(record(1,1)) // offset 3
+ log.append(record(0,0)) // offset 4
// roll the segment, so we can clean the messages already appended
log.roll()
@@ -218,18 +218,18 @@ class LogCleanerTest extends JUnitSuite {
// append messages with the keys 0 through N-1, values equal offset
while(log.numberOfSegments <= numCleanableSegments)
- log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
// at this point one message past the cleanable segments has been added
// the entire segment containing the first uncleanable offset should not be cleaned.
val firstUncleanableOffset = log.logEndOffset + 1 // +1 so it is past the baseOffset
while(log.numberOfSegments < numTotalSegments - 1)
- log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
// the last (active) segment has just one message
- def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => TestUtils.readString(m.message.payload)).toSet.size).toSeq
+ def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowIterator.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
@@ -253,7 +253,7 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
- val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+ val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
@@ -271,7 +271,7 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
- val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+ val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
@@ -305,14 +305,14 @@ class LogCleanerTest extends JUnitSuite {
// append unkeyed messages
while(log.numberOfSegments < 2)
- log.append(unkeyedMessage(log.logEndOffset.toInt))
+ log.append(unkeyedRecord(log.logEndOffset.toInt))
val numInvalidMessages = unkeyedMessageCountInLog(log)
val sizeWithUnkeyedMessages = log.size
// append keyed messages
while(log.numberOfSegments < 3)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
val (_, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
@@ -321,17 +321,17 @@ class LogCleanerTest extends JUnitSuite {
assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size)
assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, stats.invalidMessagesRead)
}
-
+
/* extract all the keys from a log */
def keysInLog(log: Log): Iterable[Int] =
- log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt))
+ log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
/* extract all the offsets from a log */
def offsetsInLog(log: Log): Iterable[Long] =
- log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset))
+ log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
def unkeyedMessageCountInLog(log: Log) =
- log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum
+ log.logSegments.map(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
throw new LogCleaningAbortedException()
@@ -350,7 +350,7 @@ class LogCleanerTest extends JUnitSuite {
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
val keys = keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
@@ -371,20 +371,20 @@ class LogCleanerTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
-
+
// append some messages to the log
var i = 0
while(log.numberOfSegments < 10) {
- log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+ log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
i += 1
}
-
+
// grouping by very large values should result in a single group with all the segments in it
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(1, groups.size)
assertEquals(log.numberOfSegments, groups.head.size)
checkSegmentOrder(groups)
-
+
// grouping by very small values should result in all groups having one entry
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue)
assertEquals(log.numberOfSegments, groups.size)
@@ -396,20 +396,20 @@ class LogCleanerTest extends JUnitSuite {
checkSegmentOrder(groups)
val groupSize = 3
-
+
// check grouping by log size
val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue)
checkSegmentOrder(groups)
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
-
+
// check grouping by index size
val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
checkSegmentOrder(groups)
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
}
-
+
/**
* Validate the logic for grouping log segments together for cleaning when only a small number of
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
@@ -425,47 +425,45 @@ class LogCleanerTest extends JUnitSuite {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
-
+
// fill up first segment
while (log.numberOfSegments == 1)
- log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
-
+ log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+
// forward offset and append message to next segment at offset Int.MaxValue
- val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(Int.MaxValue - 1),
- new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1))
- log.append(messageSet, assignOffsets = false)
- log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+ val records = MemoryRecords.withLogEntries(LogEntry.create(Int.MaxValue - 1, Record.create("hello".getBytes, "hello".getBytes)))
+ log.append(records, assignOffsets = false)
+ log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
-
+
// grouping should result in a single group with maximum relative offset of Int.MaxValue
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(1, groups.size)
-
+
// append another message, making last offset of second segment > Int.MaxValue
- log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
-
+ log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+
// grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(2, groups.size)
checkSegmentOrder(groups)
-
+
// append more messages, creating new segments, further grouping should still occur
while (log.numberOfSegments < 4)
- log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+ log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(log.numberOfSegments - 1, groups.size)
for (group <- groups)
assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
checkSegmentOrder(groups)
-
}
-
+
private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
val offsets = groups.flatMap(_.map(_.baseOffset))
assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
}
-
+
/**
* Test building an offset map off the log
*/
@@ -496,8 +494,7 @@ class LogCleanerTest extends JUnitSuite {
checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt)
checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
}
-
-
+
/**
* Tests recovery if broker crashes at the following stages during the cleaning sequence
* <ol>
@@ -516,8 +513,8 @@ class LogCleanerTest extends JUnitSuite {
logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
-
- def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {
+
+ def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {
// Recover log file and check that after recovery, keys are as expected
// and all temporary files have been deleted
val recoveredLog = makeLog(config = config)
@@ -530,25 +527,25 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(expectedKeys, keysInLog(recoveredLog))
recoveredLog
}
-
+
// create a log and append some messages
var log = makeLog(config = config)
var messageCount = 0
while(log.numberOfSegments < 10) {
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
messageCount += 1
}
val allKeys = keysInLog(log)
-
+
// pretend we have odd-numbered keys
val offsetMap = new FakeOffsetMap(Int.MaxValue)
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
-
+
// clean the log
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
var cleanedKeys = keysInLog(log)
-
+
// 1) Simulate recovery just after .cleaned file is created, before rename to .swap
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
@@ -556,44 +553,44 @@ class LogCleanerTest extends JUnitSuite {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
}
log = recoverAndCheck(config, allKeys)
-
+
// clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
cleanedKeys = keysInLog(log)
-
+
// 2) Simulate recovery just after swap file is created, before old segment files are
- // renamed to .deleted. Clean operation is resumed during recovery.
+ // renamed to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
- }
+ }
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again
while(log.numberOfSegments < 10) {
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
messageCount += 1
}
for (k <- 1 until messageCount by 2)
- offsetMap.put(key(k), Long.MaxValue)
+ offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
cleanedKeys = keysInLog(log)
-
+
// 3) Simulate recovery after swap file is created and old segments files are renamed
// to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
log = recoverAndCheck(config, cleanedKeys)
-
+
// add some more messages and clean the log again
while(log.numberOfSegments < 10) {
- log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
messageCount += 1
}
for (k <- 1 until messageCount by 2)
- offsetMap.put(key(k), Long.MaxValue)
+ offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
cleanedKeys = keysInLog(log)
-
+
// 4) Simulate recovery after swap is complete, but async deletion
// is not yet complete. Clean operation is resumed during recovery.
recoverAndCheck(config, cleanedKeys)
@@ -631,11 +628,11 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog()
val cleaner = makeCleaner(2)
- log.append(message(0,0))
- log.append(message(1,1))
- log.append(message(2,2))
- log.append(message(3,3))
- log.append(message(4,4))
+ log.append(record(0,0))
+ log.append(record(1,1))
+ log.append(record(2,2))
+ log.append(record(3,3))
+ log.append(record(4,4))
log.roll()
val stats = new CleanerStats()
@@ -653,7 +650,7 @@ class LogCleanerTest extends JUnitSuite {
*/
@Test
def testCleanCorruptMessageSet() {
- val codec = SnappyCompressionCodec
+ val codec = CompressionType.GZIP
val logProps = new Properties()
logProps.put(LogConfig.CompressionTypeProp, codec.name)
@@ -682,10 +679,10 @@ class LogCleanerTest extends JUnitSuite {
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
- for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) {
- assertEquals(shallowMessage.message.magic, deepMessage.message.magic)
- val value = TestUtils.readString(deepMessage.message.payload).toLong
- assertEquals(deepMessage.offset, value)
+ for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowIterator.asScala; deepLogEntry <- shallowLogEntry.asScala) {
+ assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic)
+ val value = TestUtils.readString(deepLogEntry.record.value).toLong
+ assertEquals(deepLogEntry.offset, value)
}
}
@@ -704,7 +701,7 @@ class LogCleanerTest extends JUnitSuite {
val corruptedMessage = invalidCleanedMessage(offset, set)
val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
- for (logEntry <- records.iterator.asScala) {
+ for (logEntry <- records.deepIterator.asScala) {
val offset = logEntry.offset
val value = TestUtils.readString(logEntry.record.value).toLong
assertEquals(offset, value)
@@ -718,94 +715,64 @@ class LogCleanerTest extends JUnitSuite {
private def invalidCleanedMessage(initialOffset: Long,
keysAndValues: Iterable[(Int, Int)],
- codec: CompressionCodec = SnappyCompressionCodec): ByteBufferMessageSet = {
+ codec: CompressionType = CompressionType.GZIP): MemoryRecords = {
// this function replicates the old versions of the cleaner which under some circumstances
// would write invalid compressed message sets with the outer magic set to 1 and the inner
// magic set to 0
-
- val messages = keysAndValues.map(kv =>
- new Message(key = kv._1.toString.getBytes,
- bytes = kv._2.toString.getBytes,
- timestamp = Message.NoTimestamp,
- magicValue = Message.MagicValue_V0))
-
- val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
- var lastOffset = initialOffset
-
- messageWriter.write(
- codec = codec,
- timestamp = Message.NoTimestamp,
- timestampType = TimestampType.CREATE_TIME,
- magicValue = Message.MagicValue_V1) { outputStream =>
-
- val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream))
- try {
- for (message <- messages) {
- val innerOffset = lastOffset - initialOffset
- output.writeLong(innerOffset)
- output.writeInt(message.size)
- output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
- lastOffset += 1
- }
- } finally {
- output.close()
- }
+ val records = keysAndValues.map(kv =>
+ Record.create(Record.MAGIC_VALUE_V0,
+ Record.NO_TIMESTAMP,
+ kv._1.toString.getBytes,
+ kv._2.toString.getBytes))
+
+ val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
+ val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME)
+
+ var offset = initialOffset
+ records.foreach { record =>
+ builder.appendUnchecked(offset, record)
+ offset += 1
}
- val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
- ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1)
- buffer.rewind()
- new ByteBufferMessageSet(buffer)
+ builder.build()
}
private def messageWithOffset(key: Int, value: Int, offset: Long) =
- new ByteBufferMessageSet(NoCompressionCodec, Seq(offset),
- new Message(key = key.toString.getBytes,
- bytes = value.toString.getBytes,
- timestamp = Message.NoTimestamp,
- magicValue = Message.MagicValue_V1))
-
-
+ MemoryRecords.withLogEntries(LogEntry.create(offset, Record.create(key.toString.getBytes, value.toString.getBytes)))
+
def makeLog(dir: File = dir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ }
def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) =
- new Cleaner(id = 0,
- offsetMap = new FakeOffsetMap(capacity),
+ new Cleaner(id = 0,
+ offsetMap = new FakeOffsetMap(capacity),
ioBufferSize = maxMessageSize,
maxIoBufferSize = maxMessageSize,
dupBufferLoadFactor = 0.75,
- throttler = throttler,
+ throttler = throttler,
time = time,
checkDone = checkDone )
-
+
def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
for((key, value) <- seq)
- yield log.append(message(key, value)).firstOffset
+ yield log.append(record(key, value)).firstOffset
}
-
+
def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
-
- def message(key: Int, value: Int): ByteBufferMessageSet =
- message(key, value.toString.getBytes)
-
- def message(key: Int, value: Array[Byte]) =
- new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
- bytes = value,
- timestamp = Message.NoTimestamp,
- magicValue = Message.MagicValue_V1))
-
- def unkeyedMessage(value: Int) =
- new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes))
-
- def deleteMessage(key: Int) =
- new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
- bytes = null,
- timestamp = Message.NoTimestamp,
- magicValue = Message.MagicValue_V1))
-
+
+ def record(key: Int, value: Int): MemoryRecords =
+ record(key, value.toString.getBytes)
+
+ def record(key: Int, value: Array[Byte]) =
+ MemoryRecords.withRecords(Record.create(key.toString.getBytes, value))
+
+ def unkeyedRecord(value: Int) =
+ MemoryRecords.withRecords(Record.create(value.toString.getBytes))
+
+ def tombstoneRecord(key: Int) = record(key, null)
+
}
class FakeOffsetMap(val slots: Int) extends OffsetMap {
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 5421da9..40e6228 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -67,7 +67,7 @@ class LogManagerTest {
val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
val logFile = new File(logDir, name + "-0")
assertTrue(logFile.exists)
- log.append(TestUtils.singleMessageSet("test".getBytes()))
+ log.append(TestUtils.singletonRecords("test".getBytes()))
}
/**
@@ -89,7 +89,7 @@ class LogManagerTest {
val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
var offset = 0L
for(_ <- 0 until 200) {
- val set = TestUtils.singleMessageSet("test".getBytes())
+ val set = TestUtils.singletonRecords("test".getBytes())
val info = log.append(set)
offset = info.lastOffset
}
@@ -101,7 +101,7 @@ class LogManagerTest {
assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)
assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
- assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes)
+ assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
try {
log.read(0, 1024)
@@ -110,7 +110,7 @@ class LogManagerTest {
case _: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
- log.append(TestUtils.singleMessageSet("test".getBytes()))
+ log.append(TestUtils.singletonRecords("test".getBytes()))
}
/**
@@ -118,7 +118,7 @@ class LogManagerTest {
*/
@Test
def testCleanupSegmentsToMaintainSize() {
- val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
+ val setSize = TestUtils.singletonRecords("test".getBytes()).sizeInBytes
logManager.shutdown()
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 10 * setSize: java.lang.Integer)
@@ -135,7 +135,7 @@ class LogManagerTest {
// add a bunch of messages that should be larger than the retentionSize
val numMessages = 200
for (_ <- 0 until numMessages) {
- val set = TestUtils.singleMessageSet("test".getBytes())
+ val set = TestUtils.singletonRecords("test".getBytes())
val info = log.append(set)
offset = info.firstOffset
}
@@ -147,7 +147,7 @@ class LogManagerTest {
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)
assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
- assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes)
+ assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
try {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
@@ -155,7 +155,7 @@ class LogManagerTest {
case _: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
- log.append(TestUtils.singleMessageSet("test".getBytes()))
+ log.append(TestUtils.singletonRecords("test".getBytes()))
}
/**
@@ -169,7 +169,7 @@ class LogManagerTest {
val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
var offset = 0L
for (_ <- 0 until 200) {
- val set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
+ val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes())
val info = log.append(set)
offset = info.lastOffset
}
@@ -198,7 +198,7 @@ class LogManagerTest {
val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
for (_ <- 0 until 200) {
- val set = TestUtils.singleMessageSet("test".getBytes())
+ val set = TestUtils.singletonRecords("test".getBytes())
log.append(set)
}
time.sleep(logManager.InitialTaskDelayMs)
@@ -280,7 +280,7 @@ class LogManagerTest {
val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
logs.foreach(log => {
for (_ <- 0 until 50)
- log.append(TestUtils.singleMessageSet("test".getBytes()))
+ log.append(TestUtils.singletonRecords("test".getBytes()))
log.flush()
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index f02c5cb..d99981a 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -16,15 +16,13 @@
*/
package kafka.log
-import org.junit.Assert._
-import java.util.concurrent.atomic._
-
-import kafka.common.LongRef
-import org.junit.{After, Test}
import kafka.utils.TestUtils
-import kafka.message._
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
import org.apache.kafka.common.utils.Time
+import org.junit.Assert._
+import org.junit.{After, Test}
+import scala.collection.JavaConverters._
import scala.collection._
class LogSegmentTest {
@@ -34,7 +32,7 @@ class LogSegmentTest {
/* create a segment with the given base offset */
def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
val msFile = TestUtils.tempFile()
- val ms = new FileMessageSet(msFile)
+ val ms = FileRecords.open(msFile)
val idxFile = TestUtils.tempFile()
val timeIdxFile = TestUtils.tempFile()
idxFile.delete()
@@ -47,12 +45,10 @@ class LogSegmentTest {
}
/* create a ByteBufferMessageSet for the given messages starting from the given offset */
- def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- offsetCounter = new LongRef(offset),
- messages = messages.map(s => new Message(s.getBytes, offset * 10, Message.MagicValue_V1)):_*)
+ def records(offset: Long, records: String*): MemoryRecords = {
+ MemoryRecords.withRecords(offset, records.map(s => Record.create(Record.MAGIC_VALUE_V1, offset * 10, s.getBytes)):_*)
}
-
+
@After
def teardown() {
for(seg <- segments) {
@@ -60,7 +56,7 @@ class LogSegmentTest {
seg.log.delete()
}
}
-
+
/**
* A read on an empty log segment should return null
*/
@@ -70,7 +66,7 @@ class LogSegmentTest {
val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None)
assertNull("Read beyond the last offset in the segment should be null", read)
}
-
+
/**
* Reading from before the first offset in the segment should return messages
* beginning with the first message in the segment
@@ -78,12 +74,12 @@ class LogSegmentTest {
@Test
def testReadBeforeFirstOffset() {
val seg = createSegment(40)
- val ms = messages(50, "hello", "there", "little", "bee")
- seg.append(50, Message.NoTimestamp, -1L, ms)
- val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet
- assertEquals(ms.toList, read.toList)
+ val ms = records(50, "hello", "there", "little", "bee")
+ seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+ val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
+ assertEquals(ms.deepIterator.asScala.toList, read.deepIterator.asScala.toList)
}
-
+
/**
* If we set the startOffset and maxOffset for the read to be the same value
* we should get only the first message in the log
@@ -92,28 +88,28 @@ class LogSegmentTest {
def testMaxOffset() {
val baseOffset = 50
val seg = createSegment(baseOffset)
- val ms = messages(baseOffset, "hello", "there", "beautiful")
- seg.append(baseOffset, Message.NoTimestamp, -1L, ms)
- def validate(offset: Long) =
- assertEquals(ms.filter(_.offset == offset).toList,
- seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList)
+ val ms = records(baseOffset, "hello", "there", "beautiful")
+ seg.append(baseOffset, Record.NO_TIMESTAMP, -1L, ms)
+ def validate(offset: Long) =
+ assertEquals(ms.deepIterator.asScala.filter(_.offset == offset).toList,
+ seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepIterator.asScala.toList)
validate(50)
validate(51)
validate(52)
}
-
+
/**
* If we read from an offset beyond the last offset in the segment we should get null
*/
@Test
def testReadAfterLast() {
val seg = createSegment(40)
- val ms = messages(50, "hello", "there")
- seg.append(50, Message.NoTimestamp, -1L, ms)
+ val ms = records(50, "hello", "there")
+ seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
assertNull("Read beyond the last offset in the segment should give null", read)
}
-
+
/**
* If we read from an offset which doesn't exist we should get a message set beginning
* with the least offset greater than the given startOffset.
@@ -121,14 +117,14 @@ class LogSegmentTest {
@Test
def testReadFromGap() {
val seg = createSegment(40)
- val ms = messages(50, "hello", "there")
- seg.append(50, Message.NoTimestamp, -1L, ms)
- val ms2 = messages(60, "alpha", "beta")
- seg.append(60, Message.NoTimestamp, -1L, ms2)
+ val ms = records(50, "hello", "there")
+ seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+ val ms2 = records(60, "alpha", "beta")
+ seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.toList, read.messageSet.toList)
+ assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
}
-
+
/**
* In a loop append two messages then truncate off the second of those messages and check that we can read
* the first but not the second message.
@@ -138,18 +134,18 @@ class LogSegmentTest {
val seg = createSegment(40)
var offset = 40
for (_ <- 0 until 30) {
- val ms1 = messages(offset, "hello")
- seg.append(offset, Message.NoTimestamp, -1L, ms1)
- val ms2 = messages(offset + 1, "hello")
- seg.append(offset + 1, Message.NoTimestamp, -1L, ms2)
+ val ms1 = records(offset, "hello")
+ seg.append(offset, Record.NO_TIMESTAMP, -1L, ms1)
+ val ms2 = records(offset + 1, "hello")
+ seg.append(offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
// check that we can read back both messages
val read = seg.read(offset, None, 10000)
- assertEquals(List(ms1.head, ms2.head), read.messageSet.toList)
+ assertEquals(List(ms1.deepIterator.next(), ms2.deepIterator.next()), read.records.deepIterator.asScala.toList)
// now truncate off the last message
seg.truncateTo(offset + 1)
val read2 = seg.read(offset, None, 10000)
- assertEquals(1, read2.messageSet.size)
- assertEquals(ms1.head, read2.messageSet.head)
+ assertEquals(1, read2.records.deepIterator.asScala.size)
+ assertEquals(ms1.deepIterator.next(), read2.records.deepIterator.next())
offset += 1
}
}
@@ -157,10 +153,10 @@ class LogSegmentTest {
@Test
def testReloadLargestTimestampAfterTruncation() {
val numMessages = 30
- val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
+ val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
var offset = 40
for (_ <- 0 until numMessages) {
- seg.append(offset, offset, offset, messages(offset, "hello"))
+ seg.append(offset, offset, offset, records(offset, "hello"))
offset += 1
}
val expectedNumEntries = numMessages / 2 - 1
@@ -179,10 +175,10 @@ class LogSegmentTest {
def testTruncateFull() {
// test the case where we fully truncate the log
val seg = createSegment(40)
- seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
+ seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
seg.truncateTo(0)
assertNull("Segment should be empty.", seg.read(0, None, 1024))
- seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
+ seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
}
/**
@@ -190,11 +186,11 @@ class LogSegmentTest {
*/
@Test
def testFindOffsetByTimestamp() {
- val messageSize = messages(0, s"msg00").sizeInBytes
+ val messageSize = records(0, s"msg00").sizeInBytes
val seg = createSegment(40, messageSize * 2 - 1)
// Produce some messages
for (i <- 40 until 50)
- seg.append(i, i * 10, i, messages(i, s"msg$i"))
+ seg.append(i, i * 10, i, records(i, s"msg$i"))
assertEquals(490, seg.largestTimestamp)
// Search for an indexed timestamp
@@ -218,10 +214,10 @@ class LogSegmentTest {
def testNextOffsetCalculation() {
val seg = createSegment(40)
assertEquals(40, seg.nextOffset)
- seg.append(50, Message.NoTimestamp, -1L, messages(50, "hello", "there", "you"))
+ seg.append(50, Record.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
assertEquals(53, seg.nextOffset())
}
-
+
/**
* Test that we can change the file suffixes for the log and index files
*/
@@ -236,7 +232,7 @@ class LogSegmentTest {
assertTrue(seg.log.file.exists)
assertTrue(seg.index.file.exists)
}
-
+
/**
* Create a segment with some data and an index. Then corrupt the index,
* and recover the segment, the entries should all be readable.
@@ -245,12 +241,12 @@ class LogSegmentTest {
def testRecoveryFixesCorruptIndex() {
val seg = createSegment(0)
for(i <- 0 until 100)
- seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
+ seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
val indexFile = seg.index.file
TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(64*1024)
for(i <- 0 until 100)
- assertEquals(i, seg.read(i, Some(i + 1), 1024).messageSet.head.offset)
+ assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepIterator.next().offset)
}
/**
@@ -261,7 +257,7 @@ class LogSegmentTest {
def testRecoveryFixesCorruptTimeIndex() {
val seg = createSegment(0)
for(i <- 0 until 100)
- seg.append(i, i * 10, i, messages(i, i.toString))
+ seg.append(i, i * 10, i, records(i, i.toString))
val timeIndexFile = seg.timeIndex.file
TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
seg.recover(64*1024)
@@ -271,7 +267,7 @@ class LogSegmentTest {
assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get.offset)
}
}
-
+
/**
* Randomly corrupt a log a number of times and attempt recovery.
*/
@@ -281,13 +277,15 @@ class LogSegmentTest {
for (_ <- 0 until 10) {
val seg = createSegment(0)
for(i <- 0 until messagesAppended)
- seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
+ seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
// start corrupting somewhere in the middle of the chosen record all the way to the end
- val position = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)._1.position + TestUtils.random.nextInt(15)
- TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
+
+ val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
+ val position = recordPosition.position + TestUtils.random.nextInt(15)
+ TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
seg.recover(64*1024)
- assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
+ assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowIterator.asScala.map(_.offset).toList)
seg.delete()
}
}
@@ -304,12 +302,12 @@ class LogSegmentTest {
@Test
def testCreateWithInitFileSizeAppendMessage() {
val seg = createSegment(40, false, 512*1024*1024, true)
- val ms = messages(50, "hello", "there")
- seg.append(50, Message.NoTimestamp, -1L, ms)
- val ms2 = messages(60, "alpha", "beta")
- seg.append(60, Message.NoTimestamp, -1L, ms2)
+ val ms = records(50, "hello", "there")
+ seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+ val ms2 = records(60, "alpha", "beta")
+ seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.toList, read.messageSet.toList)
+ assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
}
/* create a segment with pre allocate and clearly shut down*/
@@ -318,12 +316,12 @@ class LogSegmentTest {
val tempDir = TestUtils.tempDir()
val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true)
- val ms = messages(50, "hello", "there")
- seg.append(50, Message.NoTimestamp, -1L, ms)
- val ms2 = messages(60, "alpha", "beta")
- seg.append(60, Message.NoTimestamp, -1L, ms2)
+ val ms = records(50, "hello", "there")
+ seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+ val ms2 = records(60, "alpha", "beta")
+ seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.toList, read.messageSet.toList)
+ assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
val oldSize = seg.log.sizeInBytes()
val oldPosition = seg.log.channel.position
val oldFileSize = seg.log.file.length
@@ -336,7 +334,7 @@ class LogSegmentTest {
segments += segReopen
val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.toList, readAgain.messageSet.toList)
+ assertEquals(ms2.deepIterator.asScala.toList, readAgain.records.deepIterator.asScala.toList)
val size = segReopen.log.sizeInBytes()
val position = segReopen.log.channel.position
val fileSize = segReopen.log.file.length