You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/24 19:43:55 UTC
[02/11] kafka git commit: KAFKA-4816;
Message format changes for idempotent/transactional producer (KIP-98)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 5825ab7..c87e927 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -18,16 +18,18 @@
package kafka.log
import java.io._
+import java.nio.ByteBuffer
import java.util.Properties
-import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
+import org.apache.kafka.common.errors._
import kafka.api.ApiVersion
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils._
import kafka.server.KafkaConfig
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP
+import org.apache.kafka.common.record.{RecordBatch, _}
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
@@ -113,7 +115,7 @@ class LogTest extends JUnitSuite {
val numSegments = log.numberOfSegments
time.sleep(log.config.segmentMs + 1)
- log.append(MemoryRecords.withLogEntries())
+ log.append(MemoryRecords.withRecords(CompressionType.NONE))
assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
}
@@ -127,7 +129,7 @@ class LogTest extends JUnitSuite {
val maxJitter = 20 * 60L
val logProps = new Properties()
- logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
+ logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long)
// create a log
val log = new Log(logDir,
@@ -193,17 +195,19 @@ class LogTest extends JUnitSuite {
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
- val records = (0 until 100 by 2).map(id => Record.create(time.milliseconds, null, id.toString.getBytes)).toArray
+ val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
- for(i <- records.indices)
- log.append(MemoryRecords.withRecords(records(i)))
+ for(value <- values)
+ log.append(TestUtils.singletonRecords(value = value))
- for(i <- records.indices) {
- val read = log.read(i, 100, Some(i+1)).records.shallowEntries.iterator.next()
- assertEquals("Offset read should match order appended.", i, read.offset)
- assertEquals("Message should match appended.", records(i), read.record)
+ for(i <- values.indices) {
+ val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+ assertEquals("Offset read should match order appended.", i, read.lastOffset)
+ val actual = read.iterator.next()
+ assertNull("Key should be null", actual.key)
+ assertEquals("Values not equal", ByteBuffer.wrap(values(i)), actual.value)
}
- assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowEntries.asScala.size)
+ assertEquals("Reading beyond the last message returns nothing.", 0, log.read(values.length, 100, None).records.batches.asScala.size)
}
/**
@@ -213,19 +217,19 @@ class LogTest extends JUnitSuite {
@Test
def testAppendAndReadWithNonSequentialOffsets() {
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
- val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
+ val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
// now test the case that we give the offsets and use non-sequential offsets
for(i <- records.indices)
- log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
+ log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
for(i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
- val read = log.read(i, 100, None).records.shallowEntries.iterator.next()
+ val read = log.read(i, 100, None).records.records.iterator.next()
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
- assertEquals("Message should match appended.", records(idx), read.record)
+ assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
}
}
@@ -243,26 +247,26 @@ class LogTest extends JUnitSuite {
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
- log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, "42".getBytes)))
+ log.append(TestUtils.singletonRecords(value = "42".getBytes))
// now manually truncate off all but one message from the first segment to create a gap in the messages
log.logSegments.head.truncateTo(1)
assertEquals("A read should now return the last message in the log", log.logEndOffset - 1,
- log.read(1, 200, None).records.shallowEntries.iterator.next().offset)
+ log.read(1, 200, None).records.batches.iterator.next().lastOffset)
}
@Test
def testReadWithMinMessage() {
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
- val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
+ val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
// now test the case that we give the offsets and use non-sequential offsets
for (i <- records.indices)
- log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
+ log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
for (i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
@@ -270,28 +274,27 @@ class LogTest extends JUnitSuite {
log.read(i, 1, minOneMessage = true),
log.read(i, 100, minOneMessage = true),
log.read(i, 100, Some(10000), minOneMessage = true)
- ).map(_.records.shallowEntries.iterator.next())
+ ).map(_.records.records.iterator.next())
reads.foreach { read =>
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
- assertEquals("Message should match appended.", records(idx), read.record)
+ assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
}
- assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowEntries.asScala.toIndexedSeq)
+ assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq)
}
-
}
@Test
def testReadWithTooSmallMaxLength() {
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
- val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
+ val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
// now test the case that we give the offsets and use non-sequential offsets
for (i <- records.indices)
- log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
+ log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
for (i <- 50 until messageIds.max) {
assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records)
@@ -322,7 +325,7 @@ class LogTest extends JUnitSuite {
// set up replica log starting with offset 1024 and with one message (at offset 1024)
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
- log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, "42".getBytes)))
+ log.append(TestUtils.singletonRecords(value = "42".getBytes))
assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@@ -362,15 +365,19 @@ class LogTest extends JUnitSuite {
/* do successive reads to ensure all our messages are there */
var offset = 0L
for(i <- 0 until numMessages) {
- val messages = log.read(offset, 1024*1024).records.shallowEntries
+ val messages = log.read(offset, 1024*1024).records.batches
val head = messages.iterator.next()
- assertEquals("Offsets not equal", offset, head.offset)
- assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowEntries.iterator.next().record,
- head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic, TimestampType.NO_TIMESTAMP_TYPE))
- offset = head.offset + 1
+ assertEquals("Offsets not equal", offset, head.lastOffset)
+
+ val expected = messageSets(i).records.iterator.next()
+ val actual = head.iterator.next()
+ assertEquals(s"Keys not equal at offset $offset", expected.key, actual.key)
+ assertEquals(s"Values not equal at offset $offset", expected.value, actual.value)
+ assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp)
+ offset = head.lastOffset + 1
}
val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records
- assertEquals("Should be no more messages", 0, lastRead.shallowEntries.asScala.size)
+ assertEquals("Should be no more messages", 0, lastRead.records.asScala.size)
// check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
TestUtils.retry(1000L){
@@ -385,24 +392,20 @@ class LogTest extends JUnitSuite {
def testCompressedMessages() {
/* this log should roll after every messageset */
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
- log.append(MemoryRecords.withRecords(CompressionType.GZIP,
- Record.create(time.milliseconds, null, "hello".getBytes),
- Record.create(time.milliseconds, null, "there".getBytes)))
- log.append(MemoryRecords.withRecords(CompressionType.GZIP,
- Record.create(time.milliseconds, null, "alpha".getBytes),
- Record.create(time.milliseconds, null, "beta".getBytes)))
+ log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
+ log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)))
- def read(offset: Int) = log.read(offset, 4096).records.deepEntries.iterator
+ def read(offset: Int) = log.read(offset, 4096).records.records
/* we should always get the first message in the compressed set when reading any offset in the set */
- assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
- assertEquals("Read at offset 1 should produce 0", 0, read(1).next().offset)
- assertEquals("Read at offset 2 should produce 2", 2, read(2).next().offset)
- assertEquals("Read at offset 3 should produce 2", 2, read(3).next().offset)
+ assertEquals("Read at offset 0 should produce 0", 0, read(0).iterator.next().offset)
+ assertEquals("Read at offset 1 should produce 0", 0, read(1).iterator.next().offset)
+ assertEquals("Read at offset 2 should produce 2", 2, read(2).iterator.next().offset)
+ assertEquals("Read at offset 3 should produce 2", 2, read(3).iterator.next().offset)
}
/**
@@ -445,8 +448,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testMessageSetSizeCheck() {
- val messageSet = MemoryRecords.withRecords(Record.create(time.milliseconds, null, "You".getBytes),
- Record.create(time.milliseconds, null, "bethe".getBytes))
+ val messageSet = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
// append messages to log
val configSegmentSize = messageSet.sizeInBytes - 1
val logProps = new Properties()
@@ -465,9 +467,9 @@ class LogTest extends JUnitSuite {
@Test
def testCompactedTopicConstraints() {
- val keyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, time.milliseconds, "and here it is".getBytes, "this message has a key".getBytes)
- val anotherKeyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, time.milliseconds, "another key".getBytes, "this message also has a key".getBytes)
- val unkeyedMessage = Record.create("this message does not have a key".getBytes)
+ val keyedMessage = new SimpleRecord("and here it is".getBytes, "this message has a key".getBytes)
+ val anotherKeyedMessage = new SimpleRecord("another key".getBytes, "this message also has a key".getBytes)
+ val unkeyedMessage = new SimpleRecord("this message does not have a key".getBytes)
val messageSetWithUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage, keyedMessage)
val messageSetWithOneUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage)
@@ -513,11 +515,10 @@ class LogTest extends JUnitSuite {
*/
@Test
def testMessageSizeCheck() {
- val first = MemoryRecords.withRecords(CompressionType.NONE,
- Record.create(time.milliseconds, null, "You".getBytes),
- Record.create(time.milliseconds, null, "bethe".getBytes))
+ val first = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
val second = MemoryRecords.withRecords(CompressionType.NONE,
- Record.create(time.milliseconds, null, "change (I need more bytes)".getBytes))
+ new SimpleRecord("change (I need more bytes)... blah blah blah.".getBytes),
+ new SimpleRecord("More padding boo hoo".getBytes))
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
@@ -599,7 +600,7 @@ class LogTest extends JUnitSuite {
val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
val messages = (0 until numMessages).map { i =>
- MemoryRecords.withLogEntries(LogEntry.create(100 + i, Record.create(Record.MAGIC_VALUE_V1, time.milliseconds + i, i.toString.getBytes())))
+ MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
}
messages.foreach(log.append(_, assignOffsets = false))
val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
@@ -637,7 +638,7 @@ class LogTest extends JUnitSuite {
assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
for(i <- 0 until numMessages) {
- assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
+ assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
if (i == 0)
assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
else
@@ -715,7 +716,7 @@ class LogTest extends JUnitSuite {
log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages) {
- assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
+ assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
if (i == 0)
assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
else
@@ -786,12 +787,12 @@ class LogTest extends JUnitSuite {
*/
@Test
def testIndexResizingAtTruncation() {
- val setSize = TestUtils.singletonRecords(value = "test".getBytes).sizeInBytes
+ val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds).sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
- logProps.put(LogConfig.IndexIntervalBytesProp, (setSize - 1): java.lang.Integer)
+ logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
@@ -970,10 +971,10 @@ class LogTest extends JUnitSuite {
recoveryPoint = 0L,
time.scheduler,
time)
- log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, null)))
- val head = log.read(0, 4096, None).records.shallowEntries().iterator.next()
+ log.append(TestUtils.singletonRecords(value = null))
+ val head = log.read(0, 4096, None).records.records.iterator.next()
assertEquals(0, head.offset)
- assertTrue("Message payload should be null.", head.record.hasNullValue)
+ assertTrue("Message payload should be null.", !head.hasValue)
}
@Test(expected = classOf[IllegalArgumentException])
@@ -983,10 +984,10 @@ class LogTest extends JUnitSuite {
recoveryPoint = 0L,
time.scheduler,
time)
- val messages = (0 until 2).map(id => Record.create(time.milliseconds, null, id.toString.getBytes)).toArray
- messages.foreach(record => log.append(MemoryRecords.withRecords(record)))
- val invalidMessage = MemoryRecords.withRecords(Record.create(1.toString.getBytes))
- log.append(invalidMessage, assignOffsets = false)
+ val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
+ records.foreach(record => log.append(MemoryRecords.withRecords(CompressionType.NONE, record)))
+ val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
+ log.append(invalidRecord, assignOffsets = false)
}
@Test
@@ -996,7 +997,8 @@ class LogTest extends JUnitSuite {
recoveryPoint = 0L,
time.scheduler,
time)
- log.append(MemoryRecords.withRecords(Record.create(Record.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
+ log.append(MemoryRecords.withRecords(CompressionType.NONE,
+ new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
}
@Test
@@ -1020,7 +1022,7 @@ class LogTest extends JUnitSuite {
val numMessages = 50 + TestUtils.random.nextInt(50)
for (_ <- 0 until numMessages)
log.append(set)
- val messages = log.logSegments.flatMap(_.log.deepEntries.asScala.toList)
+ val records = log.logSegments.flatMap(_.log.records.asScala.toList).toList
log.close()
// corrupt index and log by appending random bytes
@@ -1030,8 +1032,18 @@ class LogTest extends JUnitSuite {
// attempt recovery
log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
assertEquals(numMessages, log.logEndOffset)
- assertEquals("Messages in the log after recovery should be the same.", messages,
- log.logSegments.flatMap(_.log.deepEntries.asScala.toList))
+
+ val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
+ assertEquals(records.size, recovered.size)
+
+ for (i <- records.indices) {
+ val expected = records(i)
+ val actual = recovered(i)
+ assertEquals(s"Keys not equal", expected.key, actual.key)
+ assertEquals(s"Values not equal", expected.value, actual.value)
+ assertEquals(s"Timestamps not equal", expected.timestamp, actual.timestamp)
+ }
+
Utils.delete(logDir)
}
}
@@ -1049,10 +1061,10 @@ class LogTest extends JUnitSuite {
recoveryPoint = 0L,
time.scheduler,
time)
- val set1 = MemoryRecords.withRecords(0, Record.create("v1".getBytes(), "k1".getBytes()))
- val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, Record.create("v3".getBytes(), "k3".getBytes()))
- val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, Record.create("v4".getBytes(), "k4".getBytes()))
- val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, Record.create("v5".getBytes(), "k5".getBytes()))
+ val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
+ val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
+ val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
+ val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, CompressionType.NONE, new SimpleRecord("v5".getBytes(), "k5".getBytes()))
//Writes into an empty log with baseOffset 0
log.append(set1, false)
assertEquals(0L, log.activeSegment.baseOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index bb50497..903c394 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -31,102 +31,137 @@ import scala.collection.JavaConverters._
class LogValidatorTest extends JUnitSuite {
@Test
- def testLogAppendTimeNonCompressed() {
+ def testLogAppendTimeNonCompressedV1() {
+ checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
+ }
+
+ private def checkLogAppendTimeNonCompressed(magic: Byte) {
val now = System.currentTimeMillis()
// The timestamps should be overwritten
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec = CompressionType.NONE)
+ val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.NONE)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
now = now,
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = magic,
messageTimestampType = TimestampType.LOG_APPEND_TIME,
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
- assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
- validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+ assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
+ validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
}
+ def testLogAppendTimeNonCompressedV2() {
+ checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2)
+ }
+
@Test
- def testLogAppendTimeWithRecompression() {
+ def testLogAppendTimeWithRecompressionV1() {
+ checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1)
+ }
+
+ private def checkLogAppendTimeWithRecompression(targetMagic: Byte) {
val now = System.currentTimeMillis()
// The timestamps should be overwritten
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
now = now,
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = targetMagic,
messageTimestampType = TimestampType.LOG_APPEND_TIME,
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
- assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
- validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
- assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
+ assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
+ validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
+ assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
- assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
- records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
+ records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
}
@Test
- def testLogAppendTimeWithoutRecompression() {
+ def testLogAppendTimeWithRecompressionV2() {
+ checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2)
+ }
+
+ @Test
+ def testLogAppendTimeWithoutRecompressionV1() {
+ checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1)
+ }
+
+ private def checkLogAppendTimeWithoutRecompression(magic: Byte) {
val now = System.currentTimeMillis()
// The timestamps should be overwritten
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec = CompressionType.GZIP)
+ val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.GZIP)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
records,
offsetCounter = new LongRef(0),
now = now,
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = magic,
messageTimestampType = TimestampType.LOG_APPEND_TIME,
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
- assertEquals("number of messages should not change", records.deepEntries.asScala.size,
- validatedRecords.deepEntries.asScala.size)
- validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
- assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
+ assertEquals("message set size should not change", records.records.asScala.size,
+ validatedRecords.records.asScala.size)
+ validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
+ assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
- assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
- records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
+ records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
}
@Test
- def testCreateTimeNonCompressed() {
+ def testLogAppendTimeWithoutRecompressionV2() {
+ checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2)
+ }
+
+ @Test
+ def testCreateTimeNonCompressedV1() {
+ checkCreateTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
+ }
+
+ def checkCreateTimeNonCompressed(magic: Byte) {
val now = System.currentTimeMillis()
val timestampSeq = Seq(now - 1, now + 1, now)
- val records = MemoryRecords.withRecords(CompressionType.NONE,
- Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
- Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
- Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
+ val records =
+ MemoryRecords.withRecords(magic, CompressionType.NONE,
+ new SimpleRecord(timestampSeq(0), "hello".getBytes),
+ new SimpleRecord(timestampSeq(1), "there".getBytes),
+ new SimpleRecord(timestampSeq(2), "beautiful".getBytes))
val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = magic,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatingResults.validatedRecords
var i = 0
- for (logEntry <- validatedRecords.deepEntries.asScala) {
- assertTrue(logEntry.record.isValid)
- assertEquals(timestampSeq(i), logEntry.record.timestamp)
- assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
- i += 1
+ for (batch <- validatedRecords.batches.asScala) {
+ assertTrue(batch.isValid)
+ assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+ assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
+ for (record <- batch.asScala) {
+ assertTrue(record.isValid)
+ assertEquals(timestampSeq(i), record.timestamp)
+ i += 1
+ }
}
assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp)
@@ -134,65 +169,118 @@ class LogValidatorTest extends JUnitSuite {
}
@Test
- def testCreateTimeUpConversion() {
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+ def testCreateTimeNonCompressedV2() {
+ checkCreateTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2)
+ }
+
+ @Test
+ def testCreateTimeUpConversionV0ToV1(): Unit = {
+ checkCreateTimeUpConvertionFromV0(RecordBatch.MAGIC_VALUE_V1)
+ }
+
+ private def checkCreateTimeUpConvertionFromV0(toMagic: Byte) {
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
- offsetCounter = new LongRef(0),
- now = System.currentTimeMillis(),
- sourceCodec = DefaultCompressionCodec,
- targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
- messageTimestampType = TimestampType.CREATE_TIME,
- messageTimestampDiffMaxMs = 1000L)
+ offsetCounter = new LongRef(0),
+ now = System.currentTimeMillis(),
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = toMagic,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
- for (logEntry <- validatedRecords.deepEntries.asScala) {
- assertTrue(logEntry.record.isValid)
- assertEquals(Record.NO_TIMESTAMP, logEntry.record.timestamp)
- assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
+ for (batch <- validatedRecords.batches.asScala) {
+ assertTrue(batch.isValid)
+ assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp)
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
}
- assertEquals(s"Max timestamp should be ${Record.NO_TIMESTAMP}", Record.NO_TIMESTAMP, validatedResults.maxTimestamp)
- assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size - 1}",
- validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertEquals(s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}", RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestamp)
+ assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
+ validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
}
@Test
- def testCreateTimeCompressed() {
+ def testCreateTimeUpConversionV0ToV2() {
+ checkCreateTimeUpConvertionFromV0(RecordBatch.MAGIC_VALUE_V2)
+ }
+
+ @Test
+ def testCreateTimeUpConversionV1ToV2() {
+ val timestamp = System.currentTimeMillis()
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP, timestamp = timestamp)
+ val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(0),
+ now = timestamp,
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 1000L)
+ val validatedRecords = validatedResults.validatedRecords
+
+ for (batch <- validatedRecords.batches.asScala) {
+ assertTrue(batch.isValid)
+ assertEquals(timestamp, batch.maxTimestamp)
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+ }
+ assertEquals(timestamp, validatedResults.maxTimestamp)
+ assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
+ validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
+ }
+
+ @Test
+ def testCreateTimeCompressedV1() {
+ createCreateTimeCompressed(RecordBatch.MAGIC_VALUE_V1)
+ }
+
+ def createCreateTimeCompressed(magic: Byte) {
val now = System.currentTimeMillis()
val timestampSeq = Seq(now - 1, now + 1, now)
- val records = MemoryRecords.withRecords(CompressionType.GZIP,
- Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
- Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
- Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
+ val records =
+ MemoryRecords.withRecords(magic, CompressionType.GZIP,
+ new SimpleRecord(timestampSeq(0), "hello".getBytes),
+ new SimpleRecord(timestampSeq(1), "there".getBytes),
+ new SimpleRecord(timestampSeq(2), "beautiful".getBytes))
val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(0),
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = magic,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 1000L)
val validatedRecords = validatedResults.validatedRecords
var i = 0
- for (logEntry <- validatedRecords.deepEntries.asScala) {
- assertTrue(logEntry.record.isValid)
- assertEquals(timestampSeq(i), logEntry.record.timestamp)
- assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
- i += 1
+ for (batch <- validatedRecords.batches.asScala) {
+ assertTrue(batch.isValid)
+ assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+ assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
+ for (record <- batch.asScala) {
+ assertTrue(record.isValid)
+ assertEquals(timestampSeq(i), record.timestamp)
+ i += 1
+ }
}
assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp)
- assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size - 1}",
- validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+ assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
+ validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
}
+ @Test
+ def testCreateTimeCompressedV2() {
+ createCreateTimeCompressed(RecordBatch.MAGIC_VALUE_V2)
+ }
+
@Test(expected = classOf[InvalidTimestampException])
- def testInvalidCreateTimeNonCompressed() {
+ def testInvalidCreateTimeNonCompressedV1() {
val now = System.currentTimeMillis()
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L,
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L,
codec = CompressionType.NONE)
LogValidator.validateMessagesAndAssignOffsets(
records,
@@ -200,15 +288,31 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 1000L)
}
@Test(expected = classOf[InvalidTimestampException])
- def testInvalidCreateTimeCompressed() {
+ def testInvalidCreateTimeNonCompressedV2() {
val now = System.currentTimeMillis()
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L,
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L,
+ codec = CompressionType.NONE)
+ LogValidator.validateMessagesAndAssignOffsets(
+ records,
+ offsetCounter = new LongRef(0),
+ now = System.currentTimeMillis(),
+ sourceCodec = NoCompressionCodec,
+ targetCodec = NoCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 1000L)
+ }
+
+ @Test(expected = classOf[InvalidTimestampException])
+ def testInvalidCreateTimeCompressedV1() {
+ val now = System.currentTimeMillis()
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L,
codec = CompressionType.GZIP)
LogValidator.validateMessagesAndAssignOffsets(
records,
@@ -216,13 +320,30 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 1000L)
}
+
+ @Test(expected = classOf[InvalidTimestampException])
+ def testInvalidCreateTimeCompressedV2() {
+ val now = System.currentTimeMillis()
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L,
+ codec = CompressionType.GZIP)
+ LogValidator.validateMessagesAndAssignOffsets(
+ records,
+ offsetCounter = new LongRef(0),
+ now = System.currentTimeMillis(),
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 1000L)
+ }
+
@Test
def testAbsoluteOffsetAssignmentNonCompressed() {
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
val offset = 1234567
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -230,14 +351,14 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V0,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
}
@Test
def testAbsoluteOffsetAssignmentCompressed() {
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
val offset = 1234567
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -245,15 +366,15 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V0,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
}
@Test
- def testRelativeOffsetAssignmentNonCompressed() {
+ def testRelativeOffsetAssignmentNonCompressedV1() {
val now = System.currentTimeMillis()
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.NONE)
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.NONE)
val offset = 1234567
checkOffsets(records, 0)
val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
@@ -261,15 +382,33 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 5000L).validatedRecords
checkOffsets(messageWithOffset, offset)
}
@Test
- def testRelativeOffsetAssignmentCompressed() {
+ def testRelativeOffsetAssignmentNonCompressedV2() {
val now = System.currentTimeMillis()
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.GZIP)
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now, codec = CompressionType.NONE)
+ val offset = 1234567
+ checkOffsets(records, 0)
+ val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = NoCompressionCodec,
+ targetCodec = NoCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 5000L).validatedRecords
+ checkOffsets(messageWithOffset, offset)
+ }
+
+ @Test
+ def testRelativeOffsetAssignmentCompressedV1() {
+ val now = System.currentTimeMillis()
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.GZIP)
val offset = 1234567
checkOffsets(records, 0)
val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
@@ -278,14 +417,125 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 5000L).validatedRecords
checkOffsets(compressedMessagesWithOffset, offset)
}
@Test
- def testOffsetAssignmentAfterMessageFormatConversionV0NonCompressed() {
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+ def testRelativeOffsetAssignmentCompressedV2() {
+ val now = System.currentTimeMillis()
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now, codec = CompressionType.GZIP)
+ val offset = 1234567
+ checkOffsets(records, 0)
+ val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
+ records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 5000L).validatedRecords
+ checkOffsets(compressedMessagesWithOffset, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterUpConversionV0ToV1NonCompressed() {
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+ checkOffsets(records, 0)
+ val offset = 1234567
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = NoCompressionCodec,
+ targetCodec = NoCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+ messageTimestampType = TimestampType.LOG_APPEND_TIME,
+ messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterUpConversionV0ToV2NonCompressed() {
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+ checkOffsets(records, 0)
+ val offset = 1234567
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = NoCompressionCodec,
+ targetCodec = NoCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+ messageTimestampType = TimestampType.LOG_APPEND_TIME,
+ messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterUpConversionV0ToV1Compressed() {
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+ val offset = 1234567
+ checkOffsets(records, 0)
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+ messageTimestampType = TimestampType.LOG_APPEND_TIME,
+ messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterUpConversionV0ToV2Compressed() {
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+ val offset = 1234567
+ checkOffsets(records, 0)
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+ messageTimestampType = TimestampType.LOG_APPEND_TIME,
+ messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterDownConversionV1ToV0NonCompressed() {
+ val offset = 1234567
+ val now = System.currentTimeMillis()
+ val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, codec = CompressionType.NONE)
+ checkOffsets(records, 0)
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = NoCompressionCodec,
+ targetCodec = NoCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterDownConversionV1ToV0Compressed() {
+ val offset = 1234567
+ val now = System.currentTimeMillis()
+ val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, CompressionType.GZIP)
+ checkOffsets(records, 0)
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterUpConversionV1ToV2NonCompressed() {
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.NONE)
checkOffsets(records, 0)
val offset = 1234567
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -293,14 +543,14 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
messageTimestampType = TimestampType.LOG_APPEND_TIME,
messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
}
@Test
- def testOffsetAssignmentAfterMessageFormatConversionV0Compressed() {
- val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+ def testOffsetAssignmentAfterUpConversionV1ToV2Compressed() {
+ val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP)
val offset = 1234567
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -308,39 +558,71 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V1,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
messageTimestampType = TimestampType.LOG_APPEND_TIME,
messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
}
@Test
- def testOffsetAssignmentAfterMessageFormatConversionV1NonCompressed() {
+ def testOffsetAssignmentAfterDownConversionV2ToV1NonCompressed() {
val offset = 1234567
val now = System.currentTimeMillis()
- val records = createRecords(Record.MAGIC_VALUE_V1, now, codec = CompressionType.NONE)
+ val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE)
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
now = System.currentTimeMillis(),
sourceCodec = NoCompressionCodec,
targetCodec = NoCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V0,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
}
@Test
- def testOffsetAssignmentAfterMessageFormatConversionV1Compressed() {
+ def testOffsetAssignmentAfterDownConversionV2ToV1Compressed() {
val offset = 1234567
val now = System.currentTimeMillis()
- val records = createRecords(Record.MAGIC_VALUE_V1, now, CompressionType.GZIP)
+ val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP)
checkOffsets(records, 0)
checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
now = System.currentTimeMillis(),
sourceCodec = DefaultCompressionCodec,
targetCodec = DefaultCompressionCodec,
- messageFormatVersion = Record.MAGIC_VALUE_V0,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterDownConversionV2ToV0NonCompressed() {
+ val offset = 1234567
+ val now = System.currentTimeMillis()
+ val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE)
+ checkOffsets(records, 0)
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = NoCompressionCodec,
+ targetCodec = NoCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
+ messageTimestampType = TimestampType.CREATE_TIME,
+ messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+ }
+
+ @Test
+ def testOffsetAssignmentAfterDownConversionV2ToV0Compressed() {
+ val offset = 1234567
+ val now = System.currentTimeMillis()
+ val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP)
+ checkOffsets(records, 0)
+ checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+ offsetCounter = new LongRef(offset),
+ now = System.currentTimeMillis(),
+ sourceCodec = DefaultCompressionCodec,
+ targetCodec = DefaultCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
}
@@ -354,33 +636,27 @@ class LogValidatorTest extends JUnitSuite {
now = System.currentTimeMillis(),
sourceCodec = SnappyCompressionCodec,
targetCodec = SnappyCompressionCodec,
+ messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
messageTimestampType = TimestampType.CREATE_TIME,
messageTimestampDiffMaxMs = 5000L)
}
private def createRecords(magicValue: Byte = Message.CurrentMagicValue,
- timestamp: Long = Message.NoTimestamp,
- codec: CompressionType = CompressionType.NONE): MemoryRecords = {
- if (magicValue == Record.MAGIC_VALUE_V0) {
- MemoryRecords.withRecords(
- codec,
- Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "hello".getBytes),
- Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "there".getBytes),
- Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "beautiful".getBytes))
- } else {
- MemoryRecords.withRecords(
- codec,
- Record.create(Record.MAGIC_VALUE_V1, timestamp, "hello".getBytes),
- Record.create(Record.MAGIC_VALUE_V1, timestamp, "there".getBytes),
- Record.create(Record.MAGIC_VALUE_V1, timestamp, "beautiful".getBytes))
- }
+ timestamp: Long = Message.NoTimestamp,
+ codec: CompressionType = CompressionType.NONE): MemoryRecords = {
+ val buf = ByteBuffer.allocate(512)
+ val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
+ builder.appendWithOffset(0, timestamp, null, "hello".getBytes)
+ builder.appendWithOffset(1, timestamp, null, "there".getBytes)
+ builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes)
+ builder.build()
}
/* check that offsets are assigned consecutively from the given base offset */
- private def checkOffsets(records: MemoryRecords, baseOffset: Long) {
- assertTrue("Message set should not be empty", records.deepEntries.asScala.nonEmpty)
+ def checkOffsets(records: MemoryRecords, baseOffset: Long) {
+ assertTrue("Message set should not be empty", records.records.asScala.nonEmpty)
var offset = baseOffset
- for (entry <- records.deepEntries.asScala) {
+ for (entry <- records.records.asScala) {
assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
offset += 1
}
@@ -388,28 +664,32 @@ class LogValidatorTest extends JUnitSuite {
private def recordsWithInvalidInnerMagic(initialOffset: Long): MemoryRecords = {
val records = (0 until 20).map(id =>
- Record.create(Record.MAGIC_VALUE_V0,
- Record.NO_TIMESTAMP,
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0,
+ RecordBatch.NO_TIMESTAMP,
id.toString.getBytes,
id.toString.getBytes))
val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
- val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.GZIP,
- TimestampType.CREATE_TIME)
+ val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP,
+ TimestampType.CREATE_TIME, 0L)
var offset = initialOffset
records.foreach { record =>
- builder.appendUnchecked(offset, record)
+ builder.appendUncheckedWithOffset(offset, record)
offset += 1
}
builder.build()
}
- def validateLogAppendTime(now: Long, record: Record) {
- record.ensureValid()
- assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp)
- assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType)
+ def validateLogAppendTime(now: Long, batch: RecordBatch) {
+ assertTrue(batch.isValid)
+ assertTrue(batch.timestampType() == TimestampType.LOG_APPEND_TIME)
+ assertEquals(s"Timestamp of message $batch should be $now", now, batch.maxTimestamp)
+ for (record <- batch.asScala) {
+ assertTrue(record.isValid)
+ assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index b7d2fa1..40581ed 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -124,7 +124,8 @@ trait BaseMessageSetTestCases extends JUnitSuite {
val written = write(channel)
assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
val fileRecords = new FileRecords(file, channel, 0, Integer.MAX_VALUE, false)
- assertEquals(set.asRecords.deepEntries.asScala.toVector, fileRecords.deepEntries.asScala.toVector)
+ assertEquals(set.asRecords.records.asScala.toVector, fileRecords.records.asScala.toVector)
+ checkEquals(set.asRecords.records.iterator, fileRecords.records.iterator)
} finally channel.close()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 17056c9..ed873d0 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, NetworkSend}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
@@ -117,7 +117,8 @@ class SocketServerTest extends JUnitSuite {
val ackTimeoutMs = 10000
val ack = 0: Short
- val emptyRequest = new ProduceRequest.Builder(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build()
+ val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+ new HashMap[TopicPartition, MemoryRecords]()).build()
val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId)
val byteBuffer = emptyRequest.serialize(emptyHeader)
byteBuffer.rewind()
@@ -287,7 +288,8 @@ class SocketServerTest extends JUnitSuite {
val clientId = ""
val ackTimeoutMs = 10000
val ack = 0: Short
- val emptyRequest = new ProduceRequest.Builder(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build()
+ val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+ new HashMap[TopicPartition, MemoryRecords]()).build()
val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId)
val byteBuffer = emptyRequest.serialize(emptyHeader)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 189e21b..41a8a6c 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -29,6 +29,7 @@ import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.record.{DefaultRecordBatch, DefaultRecord}
import org.junit.Test
import org.junit.Assert._
@@ -119,7 +120,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
assertEquals(Errors.MESSAGE_TOO_LARGE, response1.status(TopicAndPartition("test", 0)).error)
assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
- val safeSize = configs.head.messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
+ val safeSize = configs.head.messageMaxBytes - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD
val message2 = new Message(new Array[Byte](safeSize))
val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
@@ -129,7 +130,6 @@ class SyncProducerTest extends KafkaServerTestHarness {
assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
}
-
@Test
def testMessageSizeTooLargeWithAckZero() {
val server = servers.head
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index a5d8102..581a917 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -23,8 +23,7 @@ import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.ByteUtils
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.junit.Assert.{assertFalse, assertTrue}
import org.junit.{Before, Test}
@@ -156,8 +155,8 @@ class AbstractFetcherThreadTest {
@volatile var fetchCount = 0
private val normalPartitionDataSet = List(
- new TestPartitionData(MemoryRecords.withRecords(0L, Record.create("hello".getBytes()))),
- new TestPartitionData(MemoryRecords.withRecords(1L, Record.create("hello".getBytes())))
+ new TestPartitionData(MemoryRecords.withRecords(0L, CompressionType.NONE, new SimpleRecord("hello".getBytes()))),
+ new TestPartitionData(MemoryRecords.withRecords(1L, CompressionType.NONE, new SimpleRecord("hello".getBytes())))
)
override def processPartitionData(topicPartition: TopicPartition,
@@ -171,9 +170,9 @@ class AbstractFetcherThreadTest {
// Now check message's crc
val records = partitionData.toRecords
- for (entry <- records.shallowEntries.asScala) {
- entry.record.ensureValid()
- logEndOffset = entry.nextOffset
+ for (batch <- records.batches.asScala) {
+ batch.ensureValid()
+ logEndOffset = batch.nextOffset
}
}
@@ -181,15 +180,18 @@ class AbstractFetcherThreadTest {
fetchCount += 1
// Set the first fetch to get a corrupted message
if (fetchCount == 1) {
- val corruptedRecord = Record.create("hello".getBytes())
- val badChecksum = (corruptedRecord.checksum + 1 % Int.MaxValue).toInt
- // Garble checksum
- ByteUtils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum)
- val records = MemoryRecords.withRecords(corruptedRecord)
+ val record = new SimpleRecord("hello".getBytes())
+ val records = MemoryRecords.withRecords(CompressionType.NONE, record)
+ val buffer = records.buffer
+
+ // flip some bits in the message to ensure the crc fails
+ buffer.putInt(15, buffer.getInt(15) ^ 23422)
+ buffer.putInt(30, buffer.getInt(30) ^ 93242)
fetchRequest.offsets.mapValues(_ => new TestPartitionData(records)).toSeq
- } else
- // Then, the following fetches get the normal data
+ } else {
+ // Then, the following fetches get the normal data
fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq
+ }
}
override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 2d4a22a..0deb26d 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.types.Type
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, RecordBatch, MemoryRecords}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse, ResponseHeader}
import org.junit.Assert._
import org.junit.Test
@@ -116,10 +116,11 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
val version = 2: Short
val serializedBytes = {
- val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null, correlationId)
- val messageBytes = "message".getBytes
- val records = MemoryRecords.readableRecords(ByteBuffer.wrap(messageBytes))
- val request = new ProduceRequest.Builder(1, 10000, Map(topicPartition -> records).asJava).build()
+ val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, ApiKeys.PRODUCE.latestVersion, null,
+ correlationId)
+ val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))
+ val request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 10000,
+ Map(topicPartition -> records).asJava).build()
val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.toStruct.sizeOf)
byteBuffer.put(headerBytes)
request.toStruct.writeTo(byteBuffer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 5616956..b350732 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.TestUtils._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.LogEntry
+import org.apache.kafka.common.record.Record
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.Assert._
@@ -53,7 +53,6 @@ class FetchRequestTest extends BaseRequestTest {
super.tearDown()
}
-
private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
offsetMap: Map[TopicPartition, Long] = Map.empty): FetchRequest =
FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
@@ -68,9 +67,10 @@ class FetchRequestTest extends BaseRequestTest {
partitionMap
}
- private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = {
+ private def sendFetchRequest(leaderId: Int, request: FetchRequest,
+ version: Short = ApiKeys.FETCH.latestVersion): FetchResponse = {
val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
- FetchResponse.parse(response, ApiKeys.FETCH.latestVersion)
+ FetchResponse.parse(response, version)
}
@Test
@@ -121,13 +121,13 @@ class FetchRequestTest extends BaseRequestTest {
val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3)
assertEquals(shuffledTopicPartitions3, fetchResponse3.responseData.keySet.asScala.toSeq)
val responseSize3 = fetchResponse3.responseData.asScala.values.map { partitionData =>
- logEntries(partitionData).map(_.sizeInBytes).sum
+ records(partitionData).map(_.sizeInBytes).sum
}.sum
assertTrue(responseSize3 <= maxResponseBytes)
val partitionData3 = fetchResponse3.responseData.get(partitionWithLargeMessage1)
assertEquals(Errors.NONE, partitionData3.error)
assertTrue(partitionData3.highWatermark > 0)
- val size3 = logEntries(partitionData3).map(_.sizeInBytes).sum
+ val size3 = records(partitionData3).map(_.sizeInBytes).sum
assertTrue(s"Expected $size3 to be smaller than $maxResponseBytes", size3 <= maxResponseBytes)
assertTrue(s"Expected $size3 to be larger than $maxPartitionBytes", size3 > maxPartitionBytes)
assertTrue(maxPartitionBytes < partitionData3.records.sizeInBytes)
@@ -139,13 +139,13 @@ class FetchRequestTest extends BaseRequestTest {
val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4)
assertEquals(shuffledTopicPartitions4, fetchResponse4.responseData.keySet.asScala.toSeq)
val nonEmptyPartitions4 = fetchResponse4.responseData.asScala.toSeq.collect {
- case (tp, partitionData) if logEntries(partitionData).map(_.sizeInBytes).sum > 0 => tp
+ case (tp, partitionData) if records(partitionData).map(_.sizeInBytes).sum > 0 => tp
}
assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
val partitionData4 = fetchResponse4.responseData.get(partitionWithLargeMessage2)
assertEquals(Errors.NONE, partitionData4.error)
assertTrue(partitionData4.highWatermark > 0)
- val size4 = logEntries(partitionData4).map(_.sizeInBytes).sum
+ val size4 = records(partitionData4).map(_.sizeInBytes).sum
assertTrue(s"Expected $size4 to be larger than $maxResponseBytes", size4 > maxResponseBytes)
assertTrue(maxResponseBytes < partitionData4.records.sizeInBytes)
}
@@ -158,16 +158,16 @@ class FetchRequestTest extends BaseRequestTest {
"key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
createPartitionMap(maxPartitionBytes, Seq(topicPartition))).build(2)
- val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest, version = 2)
val partitionData = fetchResponse.responseData.get(topicPartition)
assertEquals(Errors.NONE, partitionData.error)
assertTrue(partitionData.highWatermark > 0)
assertEquals(maxPartitionBytes, partitionData.records.sizeInBytes)
- assertEquals(0, logEntries(partitionData).map(_.sizeInBytes).sum)
+ assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
}
- private def logEntries(partitionData: FetchResponse.PartitionData): Seq[LogEntry] = {
- partitionData.records.deepEntries.asScala.toIndexedSeq
+ private def records(partitionData: FetchResponse.PartitionData): Seq[Record] = {
+ partitionData.records.records.asScala.toIndexedSeq
}
private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse,
@@ -185,23 +185,22 @@ class FetchRequestTest extends BaseRequestTest {
val records = partitionData.records
responseBufferSize += records.sizeInBytes
- val entries = records.shallowEntries.asScala.toIndexedSeq
- assertTrue(entries.size < numMessagesPerPartition)
- val entriesSize = entries.map(_.sizeInBytes).sum
- responseSize += entriesSize
- if (entriesSize == 0 && !emptyResponseSeen) {
+ val batches = records.batches.asScala.toIndexedSeq
+ assertTrue(batches.size < numMessagesPerPartition)
+ val batchesSize = batches.map(_.sizeInBytes).sum
+ responseSize += batchesSize
+ if (batchesSize == 0 && !emptyResponseSeen) {
assertEquals(0, records.sizeInBytes)
emptyResponseSeen = true
}
- else if (entriesSize != 0 && !emptyResponseSeen) {
- assertTrue(entriesSize <= maxPartitionBytes)
+ else if (batchesSize != 0 && !emptyResponseSeen) {
+ assertTrue(batchesSize <= maxPartitionBytes)
assertEquals(maxPartitionBytes, records.sizeInBytes)
}
- else if (entriesSize != 0 && emptyResponseSeen)
- fail(s"Expected partition with size 0, but found $tp with size $entriesSize")
+ else if (batchesSize != 0 && emptyResponseSeen)
+ fail(s"Expected partition with size 0, but found $tp with size $batchesSize")
else if (records.sizeInBytes != 0 && emptyResponseSeen)
fail(s"Expected partition buffer with size 0, but found $tp with size ${records.sizeInBytes}")
-
}
assertEquals(maxResponseBytes - maxResponseBytes % maxPartitionBytes, responseBufferSize)
@@ -227,7 +226,7 @@ class FetchRequestTest extends BaseRequestTest {
val suffix = s"$tp-$messageIndex"
new ProducerRecord(tp.topic, tp.partition, s"key $suffix", s"value $suffix")
}
- records.map(producer.send).foreach(_.get)
+ records.map(producer.send(_).get)
records
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index ac766dc..9386a1d 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -31,7 +31,6 @@ import kafka.utils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, Record}
import org.apache.kafka.common.utils.{Time, Utils}
import org.easymock.{EasyMock, IAnswer}
import org.junit.Assert._
@@ -42,7 +41,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
var logDir: File = null
var topicLogDir: File = null
var server: KafkaServer = null
- var logSize: Int = 100
+ var logSize: Int = 140
var simpleConsumer: SimpleConsumer = null
var time: Time = new MockTime()
@@ -90,9 +89,8 @@ class LogOffsetTest extends ZooKeeperTestHarness {
"Log for partition [topic,0] should be created")
val log = logManager.getLog(new TopicPartition(topic, part)).get
- val record = Record.create(Integer.toString(42).getBytes())
for (_ <- 0 until 20)
- log.append(MemoryRecords.withRecords(record))
+ log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
log.flush()
val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
@@ -151,9 +149,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val logManager = server.getLogManager
val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
- val record = Record.create(Integer.toString(42).getBytes())
+
for (_ <- 0 until 20)
- log.append(MemoryRecords.withRecords(record))
+ log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
log.flush()
val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
@@ -180,9 +178,8 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val logManager = server.getLogManager
val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
- val record = Record.create(Integer.toString(42).getBytes())
for (_ <- 0 until 20)
- log.append(MemoryRecords.withRecords(record))
+ log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
log.flush()
val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime, 10)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 81118fa..2f16719 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -20,7 +20,7 @@ package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, DefaultRecordBatch, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test
@@ -41,7 +41,7 @@ class ProduceRequestTest extends BaseRequestTest {
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader,
- new ProduceRequest.Builder(-1, 3000, partitionRecords.asJava).build())
+ new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
@@ -51,12 +51,12 @@ class ProduceRequestTest extends BaseRequestTest {
partitionResponse
}
- sendAndCheck(MemoryRecords.withRecords(
- Record.create(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
+ sendAndCheck(MemoryRecords.withRecords(CompressionType.NONE,
+ new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
sendAndCheck(MemoryRecords.withRecords(CompressionType.GZIP,
- Record.create(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
- Record.create(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
+ new SimpleRecord(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
+ new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
}
/* returns a pair of partition id and leader id */
@@ -72,13 +72,14 @@ class ProduceRequestTest extends BaseRequestTest {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val timestamp = 1000000
val memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4,
- Record.create(timestamp, "key".getBytes, "value".getBytes))
- // Change the lz4 checksum value so that it doesn't match the contents
- memoryRecords.buffer.array.update(40, 0)
+ new SimpleRecord(timestamp, "key".getBytes, "value".getBytes))
+ // Change the lz4 checksum value (not the kafka record crc) so that it doesn't match the contents
+ val lz4ChecksumOffset = 6
+ memoryRecords.buffer.array.update(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + lz4ChecksumOffset, 0)
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader,
- new ProduceRequest.Builder(-1, 3000, partitionRecords.asJava).build())
+ new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)