You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/24 19:43:56 UTC
[03/11] kafka git commit: KAFKA-4816;
Message format changes for idempotent/transactional producer (KIP-98)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8315e64..038c15b 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -27,7 +27,7 @@ import kafka.serializer.Decoder
import kafka.utils._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.record.{CompressionType, FileRecords, LogEntry, Record}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import scala.collection.mutable
@@ -131,7 +131,7 @@ object DumpLogSegments {
verifyOnly: Boolean,
misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]],
maxMessageSize: Int) {
- val startOffset = file.getName().split("\\.")(0).toLong
+ val startOffset = file.getName.split("\\.")(0).toLong
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
val fileRecords = FileRecords.open(logFile, false)
val index = new OffsetIndex(file, baseOffset = startOffset)
@@ -146,10 +146,10 @@ object DumpLogSegments {
for(i <- 0 until index.entries) {
val entry = index.entry(i)
val slice = fileRecords.read(entry.position, maxMessageSize)
- val logEntry = getIterator(slice.shallowEntries.iterator.next(), isDeepIteration = true).next()
- if (logEntry.offset != entry.offset + index.baseOffset) {
+ val firstRecord = slice.records.iterator.next()
+ if (firstRecord.offset != entry.offset + index.baseOffset) {
var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
- misMatchesSeq ::=(entry.offset + index.baseOffset, logEntry.offset)
+ misMatchesSeq ::=(entry.offset + index.baseOffset, firstRecord.offset)
misMatchesForIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq)
}
// since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
@@ -179,26 +179,23 @@ object DumpLogSegments {
return
}
- var prevTimestamp = Record.NO_TIMESTAMP
+ var prevTimestamp = RecordBatch.NO_TIMESTAMP
for(i <- 0 until timeIndex.entries) {
val entry = timeIndex.entry(i)
val position = index.lookup(entry.offset + timeIndex.baseOffset).position
val partialFileRecords = fileRecords.read(position, Int.MaxValue)
- val shallowEntries = partialFileRecords.shallowEntries.asScala
- var maxTimestamp = Record.NO_TIMESTAMP
+ val batches = partialFileRecords.batches.asScala
+ var maxTimestamp = RecordBatch.NO_TIMESTAMP
// We first find the message by offset then check if the timestamp is correct.
- val maybeLogEntry = shallowEntries.find(_.offset >= entry.offset + timeIndex.baseOffset)
- maybeLogEntry match {
+ batches.find(_.lastOffset >= entry.offset + timeIndex.baseOffset) match {
case None =>
timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset,
-1.toLong)
- case Some(logEntry) if logEntry.offset != entry.offset + timeIndex.baseOffset =>
- timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset,
- logEntry.offset)
- case Some(shallowLogEntry) =>
- val deepIter = getIterator(shallowLogEntry, isDeepIteration = true)
- for (deepLogEntry <- deepIter)
- maxTimestamp = math.max(maxTimestamp, deepLogEntry.record.timestamp)
+ case Some(batch) if batch.lastOffset != entry.offset + timeIndex.baseOffset =>
+ timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, batch.lastOffset)
+ case Some(batch) =>
+ for (record <- batch.asScala)
+ maxTimestamp = math.max(maxTimestamp, record.timestamp)
if (maxTimestamp != entry.timestamp)
timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp)
@@ -222,7 +219,7 @@ object DumpLogSegments {
private class DecoderMessageParser[K, V](keyDecoder: Decoder[K], valueDecoder: Decoder[V]) extends MessageParser[K, V] {
override def parse(record: Record): (Option[K], Option[V]) = {
- if (record.hasNullValue) {
+ if (!record.hasValue) {
(None, None)
} else {
val key = if (record.hasKey)
@@ -285,7 +282,7 @@ object DumpLogSegments {
}
override def parse(record: Record): (Option[String], Option[String]) = {
- if (record.hasNullValue)
+ if (!record.hasValue)
(None, None)
else if (!record.hasKey) {
throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)")
@@ -306,54 +303,65 @@ object DumpLogSegments {
isDeepIteration: Boolean,
maxMessageSize: Int,
parser: MessageParser[_, _]) {
- val startOffset = file.getName().split("\\.")(0).toLong
+ val startOffset = file.getName.split("\\.")(0).toLong
println("Starting offset: " + startOffset)
val messageSet = FileRecords.open(file, false)
var validBytes = 0L
- var lastOffset = -1l
- for (shallowLogEntry <- messageSet.shallowEntries(maxMessageSize).asScala) {
- val itr = getIterator(shallowLogEntry, isDeepIteration)
- for (deepLogEntry <- itr) {
- val record = deepLogEntry.record()
-
- if(lastOffset == -1)
- lastOffset = deepLogEntry.offset
- // If we are iterating uncompressed messages, offsets must be consecutive
- else if (record.compressionType == CompressionType.NONE && deepLogEntry.offset != lastOffset +1) {
- var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
- nonConsecutivePairsSeq ::=(lastOffset, deepLogEntry.offset)
- nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq)
+ var lastOffset = -1L
+ val batches = messageSet.batches(maxMessageSize).asScala
+ for (batch <- batches) {
+ if (isDeepIteration) {
+ for (record <- batch.asScala) {
+ if (lastOffset == -1)
+ lastOffset = record.offset
+ else if (record.offset != lastOffset + 1) {
+ var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
+ nonConsecutivePairsSeq ::= (lastOffset, record.offset)
+ nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq)
+ }
+ lastOffset = record.offset
+
+ print("offset: " + record.offset + " position: " + validBytes +
+ " " + batch.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
+ " keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + batch.magic +
+ " compresscodec: " + batch.compressionType + " crc: " + record.checksum)
+
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+ print(" sequence: " + record.sequence +
+ " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
+ }
+
+ if (record.isControlRecord) {
+ val controlType = ControlRecordType.parse(record.key)
+ print(s" controlType: $controlType")
+ } else if (printContents) {
+ val (key, payload) = parser.parse(record)
+ key.foreach(key => print(s" key: $key"))
+ payload.foreach(payload => print(s" payload: $payload"))
+ }
+ println()
}
- lastOffset = deepLogEntry.offset
-
- print("offset: " + deepLogEntry.offset + " position: " + validBytes +
- " " + record.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
- " payloadsize: " + record.valueSize + " magic: " + record.magic +
- " compresscodec: " + record.compressionType + " crc: " + record.checksum)
- if (record.hasKey)
- print(" keysize: " + record.keySize)
- if (printContents) {
- val (key, payload) = parser.parse(record)
- key.foreach(key => print(s" key: $key"))
- payload.foreach(payload => print(s" payload: $payload"))
- }
- println()
- }
+ } else {
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+ print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset +
+ " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
+ " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
+ " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional)
+ else
+ print("offset: " + batch.lastOffset)
- validBytes += shallowLogEntry.sizeInBytes
+ println(" position: " + validBytes + " " + batch.timestampType + ": " + batch.maxTimestamp +
+ " isvalid: " + batch.isValid +
+ " size: " + batch.sizeInBytes + " magic: " + batch.magic +
+ " compresscodec: " + batch.compressionType + " crc: " + batch.checksum)
+ }
+ validBytes += batch.sizeInBytes
}
val trailingBytes = messageSet.sizeInBytes - validBytes
if(trailingBytes > 0)
println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
}
- private def getIterator(logEntry: LogEntry, isDeepIteration: Boolean): Iterator[LogEntry] = {
- if (isDeepIteration)
- logEntry.iterator.asScala
- else
- Iterator(logEntry)
- }
-
class TimeIndexDumpErrors {
val misMatchesForTimeIndexFilesMap = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
val outOfOrderTimestamp = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index a2866ad..d359c1a 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -38,12 +38,12 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.WakeupException
-import org.apache.kafka.common.record.Record
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
+import org.apache.kafka.common.record.RecordBatch
/**
* The mirror maker has the following architecture:
@@ -752,7 +752,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
- val timestamp: java.lang.Long = if (record.timestamp == Record.NO_TIMESTAMP) null else record.timestamp
+ val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, timestamp, record.key, record.value))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 492b304..c9c2e44 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -34,9 +34,6 @@ import kafka.utils._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.Time
-import scala.collection.JavaConverters._
-
-
/**
* For verifying the consistency among replicas.
*
@@ -274,8 +271,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
"fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
+ expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
- val logEntryIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
- replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.iterator
+ val recordBatchIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
+ replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.batches.iterator
}
val maxHw = fetchResponsePerReplica.values.map(_.hw).max
@@ -283,32 +280,32 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
var isMessageInAllReplicas = true
while (isMessageInAllReplicas) {
var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
- for ((replicaId, logEntriesIterator) <- logEntryIteratorMap) {
+ for ((replicaId, recordBatchIterator) <- recordBatchIteratorMap) {
try {
- if (logEntriesIterator.hasNext) {
- val logEntry = logEntriesIterator.next()
+ if (recordBatchIterator.hasNext) {
+ val batch = recordBatchIterator.next()
// only verify up to the high watermark
- if (logEntry.offset >= fetchResponsePerReplica.get(replicaId).hw)
+ if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).hw)
isMessageInAllReplicas = false
else {
messageInfoFromFirstReplicaOpt match {
case None =>
messageInfoFromFirstReplicaOpt = Some(
- MessageInfo(replicaId, logEntry.offset, logEntry.nextOffset, logEntry.record.checksum))
+ MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum))
case Some(messageInfoFromFirstReplica) =>
- if (messageInfoFromFirstReplica.offset != logEntry.offset) {
+ if (messageInfoFromFirstReplica.offset != batch.lastOffset) {
println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition
+ ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
+ messageInfoFromFirstReplica.offset + " doesn't match replica "
- + replicaId + "'s offset " + logEntry.offset)
+ + replicaId + "'s offset " + batch.lastOffset)
Exit.exit(1)
}
- if (messageInfoFromFirstReplica.checksum != logEntry.record.checksum)
+ if (messageInfoFromFirstReplica.checksum != batch.checksum)
println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
- + topicAndPartition + " has unmatched checksum at offset " + logEntry.offset + "; replica "
+ + topicAndPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
+ messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
- + "; replica " + replicaId + "'s checksum " + logEntry.record.checksum)
+ + "; replica " + replicaId + "'s checksum " + batch.checksum)
}
}
} else
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 469fbff..35bdded 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -25,8 +25,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.ApiVersionsResponse
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.Test
class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
@@ -45,6 +44,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
val nodeApiVersions = NodeApiVersions.create
for (apiKey <- ApiKeys.values) {
val apiVersion = nodeApiVersions.apiVersion(apiKey)
+ assertNotNull(apiVersion)
val versionRangeStr =
if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 22efa1f..cb4c235 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.common.KafkaException
import kafka.admin.AdminUtils
import kafka.network.SocketServer
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, RecordBatch, MemoryRecords}
class AuthorizerIntegrationTest extends BaseRequestTest {
@@ -183,8 +183,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createProduceRequest = {
- new requests.ProduceRequest.Builder(1, 5000,
- collection.mutable.Map(tp -> MemoryRecords.readableRecords(ByteBuffer.wrap("test".getBytes))).asJava).
+ new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+ collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava).
build()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 083dbf0..09b715d 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -51,12 +51,12 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
def getGroupMetadataLogOpt: Option[Log] =
logManager.getLog(new TopicPartition(Topic.GroupMetadataTopicName, 0))
- TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowEntries.asScala.nonEmpty)),
+ TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.batches.asScala.nonEmpty)),
"Commit message not appended in time")
val logSegments = getGroupMetadataLogOpt.get.logSegments
val incorrectCompressionCodecs = logSegments
- .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType))
+ .flatMap(_.log.batches.asScala.map(_.compressionType))
.filter(_ != offsetsTopicCompressionCodec)
assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 50941ce..9ec544c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -648,8 +648,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
"value".getBytes)
val largeRecord = new ProducerRecord(tp.topic(), tp.partition(), "large".getBytes,
new Array[Byte](largeProducerRecordSize))
- this.producers.head.send(smallRecord)
- this.producers.head.send(largeRecord)
+
+ this.producers.head.send(smallRecord).get
+ this.producers.head.send(largeRecord).get
// we should only get the small record in the first `poll`
consumer0.assign(List(tp).asJava)
@@ -979,6 +980,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(s"value will not be modified", new String(record.value()))
}
+ @Test
def testConsumeMessagesWithCreateTime() {
val numRecords = 50
// Test non-compressed messages
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index ce81cae..7372ac4 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -28,6 +28,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors._
+import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -120,7 +121,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
TestUtils.createTopic(zkUtils, topic10, servers.size, numServers, servers, topicConfig)
// send a record that is too large for replication, but within the broker max message limit
- val record = new ProducerRecord(topic10, null, "key".getBytes, new Array[Byte](maxMessageSize - 50))
+ val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic10, null, value)
val recordMetadata = producer3.send(record).get
assertEquals(topic10, recordMetadata.topic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 853dad6..7870485 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -91,7 +91,8 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
- (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, -1, null)))
+ (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, null)))
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
index 4788b4a..c0f385f 100644
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -21,7 +21,7 @@ import kafka.api.FetchResponsePartitionData
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
import org.junit.Test
import org.junit.Assert.assertTrue
@@ -41,10 +41,10 @@ class ReplicaVerificationToolTest {
expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
(0 until numReplicas).foreach { replicaId =>
val records = (0 to 5).map { index =>
- Record.create(s"key $index".getBytes, s"value $index".getBytes)
+ new SimpleRecord(s"key $index".getBytes, s"value $index".getBytes)
}
val initialOffset = 4
- val memoryRecords = MemoryRecords.withRecords(initialOffset, records: _*)
+ val memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records: _*)
replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE, hw = 20,
new ByteBufferMessageSet(memoryRecords.buffer)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 5d889d5..ca03ba8 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -140,13 +140,13 @@ object TestLogCleaning {
require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
val fileRecords = FileRecords.open(new File(dir, file))
- for (entry <- fileRecords.shallowEntries.asScala) {
- val key = TestUtils.readString(entry.record.key)
- val content =
- if(entry.record.hasNullValue)
+ for (entry <- fileRecords.records.asScala) {
+ val key = TestUtils.readString(entry.key)
+ val content =
+ if (!entry.hasValue)
null
else
- TestUtils.readString(entry.record.value)
+ TestUtils.readString(entry.value)
println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index a67c166..aff3b2f 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -98,8 +98,8 @@ object StressTestLog {
try {
log.read(offset, 1024, Some(offset+1)).records match {
case read: FileRecords if read.sizeInBytes > 0 => {
- val first = read.shallowEntries.iterator.next()
- require(first.offset == offset, "We should either read nothing or the message we asked for.")
+ val first = read.batches.iterator.next()
+ require(first.lastOffset == offset, "We should either read nothing or the message we asked for.")
require(first.sizeInBytes == read.sizeInBytes, "Expected %d but got %d.".format(first.sizeInBytes, read.sizeInBytes))
offset += 1
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 90236bb..9c29679 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -26,7 +26,7 @@ import joptsimple._
import kafka.log._
import kafka.message._
import kafka.utils._
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import scala.math._
@@ -102,8 +102,12 @@ object TestLinearWriteSpeed {
val rand = new Random
rand.nextBytes(buffer.array)
val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
- val messageSet = MemoryRecords.withRecords(CompressionType.forId(compressionCodec.codec),
- (0 until numMessages).map(_ => Record.create(new Array[Byte](messageSize))): _*)
+ val createTime = System.currentTimeMillis
+ val messageSet = {
+ val compressionType = CompressionType.forId(compressionCodec.codec)
+ val records = (0 until numMessages).map(_ => new SimpleRecord(createTime, null, new Array[Byte](messageSize)))
+ MemoryRecords.withRecords(compressionType, records: _*)
+ }
val writables = new Array[Writable](numFiles)
val scheduler = new KafkaScheduler(1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 22cb899..8a198eb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -17,20 +17,21 @@
package kafka.coordinator
-import kafka.utils.timer.MockTimer
-import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
-import org.junit.Assert._
+import java.util.concurrent.TimeUnit
+
import kafka.common.{OffsetAndMetadata, Topic}
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
import kafka.utils._
+import kafka.utils.timer.MockTimer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse}
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse}
import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnitSuite
-import java.util.concurrent.TimeUnit
import scala.collection._
import scala.concurrent.duration.Duration
@@ -305,7 +306,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
- EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
timer.advanceClock(DefaultSessionTimeout + 100)
@@ -1051,10 +1052,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
- new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
+ new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
)
)})
- EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
@@ -1132,10 +1133,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
- new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
+ new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
)
)})
- EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
@@ -1146,7 +1147,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val (responseFuture, responseCallback) = setupHeartbeatCallback
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
- EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 8cfae8d..6b1abf3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -24,19 +24,19 @@ import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, Topic}
import kafka.log.{Log, LogAppendInfo}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
+import kafka.utils.TestUtils.fail
import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, TimestampType}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.{Before, Test}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
-import kafka.utils.TestUtils.fail
+import org.junit.{Before, Test}
+import scala.collection.JavaConverters._
import scala.collection._
-import JavaConverters._
class GroupMetadataManagerTest {
@@ -94,7 +94,7 @@ class GroupMetadataManagerTest {
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
- val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords: _*)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
@@ -123,8 +123,9 @@ class GroupMetadataManagerTest {
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
- val tombstone = Record.create(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
- val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(tombstone): _*)
+ val tombstone = new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ offsetCommitRecords ++ Seq(tombstone): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@@ -157,7 +158,8 @@ class GroupMetadataManagerTest {
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
- val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@@ -183,8 +185,9 @@ class GroupMetadataManagerTest {
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
- val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null)
- val records = MemoryRecords.withRecords(startOffset, Seq(groupMetadataRecord, groupMetadataTombstone): _*)
+ val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ Seq(groupMetadataRecord, groupMetadataTombstone): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@@ -212,8 +215,8 @@ class GroupMetadataManagerTest {
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
- val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null)
- val records = MemoryRecords.withRecords(startOffset,
+ val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@@ -538,7 +541,7 @@ class GroupMetadataManagerTest {
EasyMock.reset(partition)
val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
- EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
.andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -549,13 +552,15 @@ class GroupMetadataManagerTest {
assertTrue(recordsCapture.hasCaptured)
val records = recordsCapture.getValue.records.asScala.toList
+ recordsCapture.getValue.batches.asScala.foreach { batch =>
+ assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+ }
assertEquals(1, records.size)
val metadataTombstone = records.head
assertTrue(metadataTombstone.hasKey)
- assertTrue(metadataTombstone.hasNullValue)
- assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
- assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType)
+ assertFalse(metadataTombstone.hasValue)
assertTrue(metadataTombstone.timestamp > 0)
val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
@@ -583,7 +588,7 @@ class GroupMetadataManagerTest {
EasyMock.reset(partition)
val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
- EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
.andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -594,14 +599,16 @@ class GroupMetadataManagerTest {
assertTrue(recordsCapture.hasCaptured)
val records = recordsCapture.getValue.records.asScala.toList
+ recordsCapture.getValue.batches.asScala.foreach { batch =>
+ assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
+ // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+ }
assertEquals(1, records.size)
val metadataTombstone = records.head
assertTrue(metadataTombstone.hasKey)
- assertTrue(metadataTombstone.hasNullValue)
- assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
- // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
- assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType)
+ assertFalse(metadataTombstone.hasValue)
assertTrue(metadataTombstone.timestamp > 0)
val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
@@ -672,7 +679,7 @@ class GroupMetadataManagerTest {
assertEquals(2, records.size)
records.foreach { message =>
assertTrue(message.hasKey)
- assertTrue(message.hasNullValue)
+ assertFalse(message.hasValue)
val offsetKey = GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey]
assertEquals(groupId, offsetKey.key.group)
assertEquals("foo", offsetKey.key.topicPartition.topic)
@@ -758,13 +765,13 @@ class GroupMetadataManagerTest {
EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
- new PartitionResponse(error, 0L, Record.NO_TIMESTAMP)
+ new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
)
)})
- EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
}
- private def buildStableGroupRecordWithMember(memberId: String): Record = {
+ private def buildStableGroupRecordWithMember(memberId: String): SimpleRecord = {
val group = new GroupMetadata(groupId)
group.transitionTo(PreparingRebalance)
val memberProtocols = List(("roundrobin", Array.emptyByteArray))
@@ -776,13 +783,13 @@ class GroupMetadataManagerTest {
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
- Record.create(groupMetadataKey, groupMetadataValue)
+ new SimpleRecord(groupMetadataKey, groupMetadataValue)
}
private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
startOffset: Long,
records: MemoryRecords): Unit = {
- val endOffset = startOffset + records.deepEntries.asScala.size
+ val endOffset = startOffset + records.records.asScala.size
val logMock = EasyMock.mock(classOf[Log])
val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
@@ -798,12 +805,12 @@ class GroupMetadataManagerTest {
}
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
- groupId: String = groupId): Seq[Record] = {
+ groupId: String = groupId): Seq[SimpleRecord] = {
committedOffsets.map { case (topicPartition, offset) =>
val offsetAndMetadata = OffsetAndMetadata(offset)
val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
- Record.create(offsetCommitKey, offsetCommitValue)
+ new SimpleRecord(offsetCommitKey, offsetCommitValue)
}.toSeq
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 6a165ed..0b1978c 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -25,7 +25,7 @@ import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record.{SimpleRecord, CompressionType, MemoryRecords}
import org.apache.kafka.common.utils.Utils
import java.util.{Collection, Properties}
@@ -57,16 +57,16 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
/* append two messages */
log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
- Record.create("hello".getBytes), Record.create("there".getBytes)))
+ new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
- def readMessage(offset: Int) = log.read(offset, 4096).records.shallowEntries.iterator.next().record
+ def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next()
if (!brokerCompression.equals("producer")) {
val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)
- assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode.codec, readMessage(0).compressionType.id)
+ assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode.codec, readBatch(0).compressionType.id)
}
else
- assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode.codec, readMessage(0).compressionType.id)
+ assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode.codec, readBatch(0).compressionType.id)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 6a097f8..f2dbc6e 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,11 +20,11 @@ package kafka.log
import java.io.File
import java.util.Properties
-import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0, KAFKA_0_11_0_IV0}
import kafka.server.OffsetCheckpoint
import kafka.utils._
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit._
@@ -55,7 +55,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@Test
def cleanerTest() {
val largeMessageKey = 20
- val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V1)
+ val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
val maxMessageSize = largeMessageSet.sizeInBytes
cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
@@ -146,7 +146,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@Test
def testCleanerWithMessageFormatV0(): Unit = {
val largeMessageKey = 20
- val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V0)
+ val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0)
val maxMessageSize = codec match {
case CompressionType.NONE => largeMessageSet.sizeInBytes
case _ =>
@@ -164,7 +164,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
log.config = new LogConfig(props)
- val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+ val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
val startSize = log.size
cleaner.startup()
@@ -176,15 +176,16 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
checkLogAfterAppendingDups(log, startSize, appends)
val appends2: Seq[(Int, String, Long)] = {
- val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+ val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
val appendInfo = log.append(largeMessageSet, assignOffsets = true)
val largeMessageOffset = appendInfo.firstOffset
- // also add some messages with version 1 to check that we handle mixed format versions correctly
- props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
+ // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
+ props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version)
log.config = new LogConfig(props)
- val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
- appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1
+ val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+ val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2)
+ appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2
}
val firstDirty2 = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty2)
@@ -204,15 +205,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
// with compression enabled, these messages will be written as a single message containing
// all of the individual messages
- var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
- appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+ var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
+ appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
log.config = new LogConfig(props)
- var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
- appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
- appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
+ var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+ appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+ appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
val appends = appendsV0 ++ appendsV1
@@ -251,15 +252,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
import JavaConverters._
- for (segment <- log.logSegments; deepLogEntry <- segment.log.deepEntries.asScala) yield {
- val key = TestUtils.readString(deepLogEntry.record.key).toInt
- val value = TestUtils.readString(deepLogEntry.record.value)
+ for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield {
+ val key = TestUtils.readString(deepLogEntry.key).toInt
+ val value = TestUtils.readString(deepLogEntry.value)
(key, value, deepLogEntry.offset)
}
}
private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
- startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+ startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val value = counter.toString
val appendInfo = log.append(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
@@ -270,19 +271,18 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
- startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+ startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val payload = counter.toString
counter += 1
(key, payload)
}
- val messages = kvs.map { case (key, payload) =>
- Record.create(magicValue, key.toString.getBytes, payload.toString.getBytes)
+ val records = kvs.map { case (key, payload) =>
+ new SimpleRecord(key.toString.getBytes, payload.toString.getBytes)
}
- val records = MemoryRecords.withRecords(codec, messages: _*)
- val appendInfo = log.append(records, assignOffsets = true)
+ val appendInfo = log.append(MemoryRecords.withRecords(magicValue, codec, records: _*), assignOffsets = true)
val offsets = appendInfo.firstOffset to appendInfo.lastOffset
kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index c24cb68..4d8c836 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -106,9 +106,9 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
private def readFromLog(log: Log): Iterable[(Int, Int)] = {
import JavaConverters._
- for (segment <- log.logSegments; logEntry <- segment.log.deepEntries.asScala) yield {
- val key = TestUtils.readString(logEntry.record.key).toInt
- val value = TestUtils.readString(logEntry.record.value).toInt
+ for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
+ val key = TestUtils.readString(record.key).toInt
+ val value = TestUtils.readString(record.value).toInt
key -> value
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index b1d2b33..3690c55 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import kafka.utils._
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Test}
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while(log.numberOfSegments < 8)
- log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
+ log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()))
val topicPartition = new TopicPartition("log", 0)
val lastClean = Map(topicPartition -> 0L)
@@ -123,7 +123,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val t0 = time.milliseconds
while(log.numberOfSegments < 4)
- log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+ log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0))
val activeSegAtT0 = log.activeSegment
@@ -131,7 +131,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val t1 = time.milliseconds
while (log.numberOfSegments < 8)
- log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
+ log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1))
val topicPartition = new TopicPartition("log", 0)
val lastClean = Map(topicPartition -> 0L)
@@ -155,7 +155,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
val t0 = time.milliseconds
while (log.numberOfSegments < 8)
- log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+ log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0))
time.sleep(compactionLag + 1)
@@ -192,7 +192,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
- private def logEntries(key: Int, value: Int, timestamp: Long) =
- MemoryRecords.withRecords(Record.create(timestamp, key.toString.getBytes, value.toString.getBytes))
+ private def records(key: Int, value: Int, timestamp: Long) =
+ MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index cf1e4cb..18c1bbe 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -31,8 +31,8 @@ import org.junit.Assert._
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
+import scala.collection.JavaConverters._
import scala.collection._
-import JavaConverters._
/**
* Unit tests for the log cleaning logic
@@ -231,7 +231,7 @@ class LogCleanerTest extends JUnitSuite {
// the last (active) segment has just one message
- def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowEntries.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
+ def distinctValuesBySegment = log.logSegments.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq
val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
@@ -255,7 +255,7 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
- val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+ val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
@@ -273,7 +273,7 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
- val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+ val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
@@ -326,14 +326,14 @@ class LogCleanerTest extends JUnitSuite {
/* extract all the keys from a log */
def keysInLog(log: Log): Iterable[Int] =
- log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
+ log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(record => TestUtils.readString(record.key).toInt))
/* extract all the offsets from a log */
def offsetsInLog(log: Log): Iterable[Long] =
- log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
+ log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(m => m.offset))
def unkeyedMessageCountInLog(log: Log) =
- log.logSegments.map(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
+ log.logSegments.map(s => s.log.records.asScala.filter(_.hasValue).count(m => !m.hasKey)).sum
def abortCheckDone(topicPartition: TopicPartition): Unit = {
throw new LogCleaningAbortedException()
@@ -423,7 +423,7 @@ class LogCleanerTest extends JUnitSuite {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 400: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@@ -433,7 +433,7 @@ class LogCleanerTest extends JUnitSuite {
log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
// forward offset and append message to next segment at offset Int.MaxValue
- val records = MemoryRecords.withLogEntries(LogEntry.create(Int.MaxValue - 1, Record.create("hello".getBytes, "hello".getBytes)))
+ val records = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 1)
log.append(records, assignOffsets = false)
log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
@@ -602,8 +602,8 @@ class LogCleanerTest extends JUnitSuite {
def testBuildOffsetMapFakeLarge(): Unit = {
val map = new FakeOffsetMap(1000)
val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
- logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+ logProps.put(LogConfig.SegmentBytesProp, 120: java.lang.Integer)
+ logProps.put(LogConfig.SegmentIndexBytesProp, 120: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val logConfig = LogConfig(logProps)
val log = makeLog(config = logConfig)
@@ -681,10 +681,10 @@ class LogCleanerTest extends JUnitSuite {
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
- for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowEntries.asScala; deepLogEntry <- shallowLogEntry.asScala) {
- assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic)
- val value = TestUtils.readString(deepLogEntry.record.value).toLong
- assertEquals(deepLogEntry.offset, value)
+ for (segment <- log.logSegments; batch <- segment.log.batches.asScala; record <- batch.asScala) {
+ assertTrue(record.hasMagic(batch.magic))
+ val value = TestUtils.readString(record.value).toLong
+ assertEquals(record.offset, value)
}
}
@@ -703,9 +703,9 @@ class LogCleanerTest extends JUnitSuite {
val corruptedMessage = invalidCleanedMessage(offset, set)
val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
- for (logEntry <- records.deepEntries.asScala) {
+ for (logEntry <- records.records.asScala) {
val offset = logEntry.offset
- val value = TestUtils.readString(logEntry.record.value).toLong
+ val value = TestUtils.readString(logEntry.value).toLong
assertEquals(offset, value)
}
}
@@ -729,19 +729,19 @@ class LogCleanerTest extends JUnitSuite {
timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset))
- assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
// Append a message and roll out another log segment.
log.append(TestUtils.singletonRecords(value = "1".getBytes,
key = "1".getBytes,
timestamp = time.milliseconds()))
log.roll()
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
- assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
+ assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
}
private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
- yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset
+ yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).lastOffset
}
private def invalidCleanedMessage(initialOffset: Long,
@@ -751,25 +751,28 @@ class LogCleanerTest extends JUnitSuite {
// would write invalid compressed message sets with the outer magic set to 1 and the inner
// magic set to 0
val records = keysAndValues.map(kv =>
- Record.create(Record.MAGIC_VALUE_V0,
- Record.NO_TIMESTAMP,
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0,
+ RecordBatch.NO_TIMESTAMP,
kv._1.toString.getBytes,
kv._2.toString.getBytes))
val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
- val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME)
+ val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME, initialOffset)
var offset = initialOffset
records.foreach { record =>
- builder.appendUnchecked(offset, record)
+ builder.appendUncheckedWithOffset(offset, record)
offset += 1
}
builder.build()
}
- private def messageWithOffset(key: Int, value: Int, offset: Long) =
- MemoryRecords.withLogEntries(LogEntry.create(offset, Record.create(key.toString.getBytes, value.toString.getBytes)))
+ private def messageWithOffset(key: Array[Byte], value: Array[Byte], offset: Long): MemoryRecords =
+ MemoryRecords.withRecords(offset, CompressionType.NONE, new SimpleRecord(key, value))
+
+ private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords =
+ messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
def makeLog(dir: File = dir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@@ -797,10 +800,10 @@ class LogCleanerTest extends JUnitSuite {
record(key, value.toString.getBytes)
def record(key: Int, value: Array[Byte]) =
- MemoryRecords.withRecords(Record.create(key.toString.getBytes, value))
+ TestUtils.singletonRecords(key = key.toString.getBytes, value = value)
def unkeyedRecord(value: Int) =
- MemoryRecords.withRecords(Record.create(value.toString.getBytes))
+ TestUtils.singletonRecords(value = value.toString.getBytes)
def tombstoneRecord(key: Int) = record(key, null)
@@ -832,5 +835,5 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
def latestOffset: Long = lastOffset
- override def toString: String = map.toString()
+ override def toString: String = map.toString
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 1197b02..25b3480 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -17,7 +17,8 @@
package kafka.log
import kafka.utils.TestUtils
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
+import kafka.utils.TestUtils.checkEquals
+import org.apache.kafka.common.record.{RecordBatch, _}
import org.apache.kafka.common.utils.Time
import org.junit.Assert._
import org.junit.{After, Test}
@@ -46,7 +47,8 @@ class LogSegmentTest {
/* create a ByteBufferMessageSet for the given messages starting from the given offset */
def records(offset: Long, records: String*): MemoryRecords = {
- MemoryRecords.withRecords(offset, records.map(s => Record.create(Record.MAGIC_VALUE_V1, offset * 10, s.getBytes)):_*)
+ MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, offset, CompressionType.NONE, TimestampType.CREATE_TIME,
+ records.map { s => new SimpleRecord(offset * 10, s.getBytes) }: _*)
}
@After
@@ -75,9 +77,9 @@ class LogSegmentTest {
def testReadBeforeFirstOffset() {
val seg = createSegment(40)
val ms = records(50, "hello", "there", "little", "bee")
- seg.append(50, 53, Record.NO_TIMESTAMP, -1L, ms)
+ seg.append(50, 53, RecordBatch.NO_TIMESTAMP, -1L, ms)
val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
- assertEquals(ms.deepEntries.asScala.toList, read.deepEntries.asScala.toList)
+ checkEquals(ms.records.iterator, read.records.iterator)
}
/**
@@ -89,10 +91,10 @@ class LogSegmentTest {
val baseOffset = 50
val seg = createSegment(baseOffset)
val ms = records(baseOffset, "hello", "there", "beautiful")
- seg.append(baseOffset, 52, Record.NO_TIMESTAMP, -1L, ms)
+ seg.append(baseOffset, 52, RecordBatch.NO_TIMESTAMP, -1L, ms)
def validate(offset: Long) =
- assertEquals(ms.deepEntries.asScala.filter(_.offset == offset).toList,
- seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepEntries.asScala.toList)
+ assertEquals(ms.records.asScala.filter(_.offset == offset).toList,
+ seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.records.asScala.toList)
validate(50)
validate(51)
validate(52)
@@ -105,7 +107,7 @@ class LogSegmentTest {
def testReadAfterLast() {
val seg = createSegment(40)
val ms = records(50, "hello", "there")
- seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+ seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
assertNull("Read beyond the last offset in the segment should give null", read)
}
@@ -118,11 +120,11 @@ class LogSegmentTest {
def testReadFromGap() {
val seg = createSegment(40)
val ms = records(50, "hello", "there")
- seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+ seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
val ms2 = records(60, "alpha", "beta")
- seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
+ seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
+ checkEquals(ms2.records.iterator, read.records.records.iterator)
}
/**
@@ -135,17 +137,17 @@ class LogSegmentTest {
var offset = 40
for (_ <- 0 until 30) {
val ms1 = records(offset, "hello")
- seg.append(offset, offset, Record.NO_TIMESTAMP, -1L, ms1)
+ seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
val ms2 = records(offset + 1, "hello")
- seg.append(offset + 1, offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
+ seg.append(offset + 1, offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
// check that we can read back both messages
val read = seg.read(offset, None, 10000)
- assertEquals(List(ms1.deepEntries.iterator.next(), ms2.deepEntries.iterator.next()), read.records.deepEntries.asScala.toList)
+ assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList)
// now truncate off the last message
seg.truncateTo(offset + 1)
val read2 = seg.read(offset, None, 10000)
- assertEquals(1, read2.records.deepEntries.asScala.size)
- assertEquals(ms1.deepEntries.iterator.next(), read2.records.deepEntries.iterator.next())
+ assertEquals(1, read2.records.records.asScala.size)
+ checkEquals(ms1.records.iterator, read2.records.records.iterator)
offset += 1
}
}
@@ -175,10 +177,10 @@ class LogSegmentTest {
def testTruncateFull() {
// test the case where we fully truncate the log
val seg = createSegment(40)
- seg.append(40, 41, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+ seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
seg.truncateTo(0)
assertNull("Segment should be empty.", seg.read(0, None, 1024))
- seg.append(40, 41, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+ seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
}
/**
@@ -214,7 +216,7 @@ class LogSegmentTest {
def testNextOffsetCalculation() {
val seg = createSegment(40)
assertEquals(40, seg.nextOffset)
- seg.append(50, 52, Record.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
+ seg.append(50, 52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
assertEquals(53, seg.nextOffset())
}
@@ -241,12 +243,12 @@ class LogSegmentTest {
def testRecoveryFixesCorruptIndex() {
val seg = createSegment(0)
for(i <- 0 until 100)
- seg.append(i, i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
+ seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
val indexFile = seg.index.file
TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(64*1024)
for(i <- 0 until 100)
- assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepEntries.iterator.next().offset)
+ assertEquals(i, seg.read(i, Some(i + 1), 1024).records.records.iterator.next().offset)
}
/**
@@ -277,7 +279,7 @@ class LogSegmentTest {
for (_ <- 0 until 10) {
val seg = createSegment(0)
for(i <- 0 until messagesAppended)
- seg.append(i, i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
+ seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
// start corrupting somewhere in the middle of the chosen record all the way to the end
@@ -285,7 +287,8 @@ class LogSegmentTest {
val position = recordPosition.position + TestUtils.random.nextInt(15)
TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
seg.recover(64*1024)
- assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowEntries.asScala.map(_.offset).toList)
+ assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList,
+ seg.log.batches.asScala.map(_.lastOffset).toList)
seg.delete()
}
}
@@ -293,7 +296,8 @@ class LogSegmentTest {
/* create a segment with pre allocate */
def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = {
val tempDir = TestUtils.tempDir()
- val seg = new LogSegment(tempDir, offset, 10, 1000, 0, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
+ val seg = new LogSegment(tempDir, offset, 10, 1000, 0, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists,
+ initFileSize = initFileSize, preallocate = preallocate)
segments += seg
seg
}
@@ -303,11 +307,11 @@ class LogSegmentTest {
def testCreateWithInitFileSizeAppendMessage() {
val seg = createSegment(40, false, 512*1024*1024, true)
val ms = records(50, "hello", "there")
- seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+ seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
val ms2 = records(60, "alpha", "beta")
- seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
+ seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
+ checkEquals(ms2.records.iterator, read.records.records.iterator)
}
/* create a segment with pre allocate and clearly shut down*/
@@ -317,11 +321,11 @@ class LogSegmentTest {
val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true)
val ms = records(50, "hello", "there")
- seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+ seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
val ms2 = records(60, "alpha", "beta")
- seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
+ seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
+ checkEquals(ms2.records.iterator, read.records.records.iterator)
val oldSize = seg.log.sizeInBytes()
val oldPosition = seg.log.channel.position
val oldFileSize = seg.log.file.length
@@ -334,7 +338,7 @@ class LogSegmentTest {
segments += segReopen
val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.deepEntries.asScala.toList, readAgain.records.deepEntries.asScala.toList)
+ checkEquals(ms2.records.iterator, readAgain.records.records.iterator)
val size = segReopen.log.sizeInBytes()
val position = segReopen.log.channel.position
val fileSize = segReopen.log.file.length