You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/14 18:41:39 UTC
[kafka] branch 2.0 updated: MINOR: Use ListOffsets request instead
of SimpleConsumer in LogOffsetTest (#5227)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 040eab6 MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227)
040eab6 is described below
commit 040eab6f13beb356b35afe2bd5be8e61fee2cb3a
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Jun 14 11:38:00 2018 -0700
MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227)
Included a few clean-ups related to unused variables in tests.
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../kafka/api/LegacyAdminClientTest.scala | 14 +-
.../kafka/integration/KafkaServerTestHarness.scala | 4 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 212 +++++++++----------
.../scala/unit/kafka/server/LogOffsetTest.scala | 224 +++++++++------------
4 files changed, 210 insertions(+), 244 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index b78946c..115ec05 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -24,7 +24,7 @@ import java.lang.{Long => JLong}
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Test}
@@ -147,16 +147,4 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
}, "Expected non-empty assignment")
}
- private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
- numRecords: Int,
- tp: TopicPartition) {
- val futures = (0 until numRecords).map { i =>
- val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
- debug(s"Sending this record: $record")
- producer.send(record)
- }
-
- futures.foreach(_.get)
- }
-
}
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 662d6d2..2c4a988 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -31,6 +31,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer}
import java.util.Properties
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.utils.Time
/**
* A test harness that brings up some number of broker nodes
@@ -82,6 +83,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
protected def trustStoreFile: Option[File] = None
protected def serverSaslProperties: Option[Properties] = None
protected def clientSaslProperties: Option[Properties] = None
+ protected def brokerTime(brokerId: Int): Time = Time.SYSTEM
@Before
override def setUp() {
@@ -96,7 +98,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
// Add each broker to `servers` buffer as soon as it is created to ensure that brokers
// are shutdown cleanly in tearDown even if a subsequent broker fails to start
for (config <- configs)
- servers += TestUtils.createServer(config)
+ servers += TestUtils.createServer(config, time = brokerTime(config.brokerId))
brokerList = TestUtils.bootstrapServers(servers, listenerName)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6c62e5e..550b929 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -98,7 +98,7 @@ class LogTest {
val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
// create a log
- val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
// Test the segment rolling behavior when messages do not have a timestamp.
mockTime.sleep(log.config.segmentMs + 1)
@@ -143,7 +143,7 @@ class LogTest {
@Test(expected = classOf[OutOfOrderSequenceException])
def testNonSequentialAppend(): Unit = {
// create a log
- val log = createLog(logDir, LogConfig(), brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
val pid = 1L
val epoch: Short = 0
@@ -156,7 +156,7 @@ class LogTest {
@Test
def testTruncateToEmptySegment(): Unit = {
- val log = createLog(logDir, LogConfig(), brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
// Force a segment roll by using a large offset. The first segment will be empty
val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
@@ -181,7 +181,7 @@ class LogTest {
// simulate the upgrade path by creating a new log with several segments, deleting the
// snapshot files, and then reloading the log
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
- var log = createLog(logDir, logConfig, brokerTopicStats)
+ var log = createLog(logDir, logConfig)
assertEquals(None, log.oldestProducerSnapshotOffset)
for (i <- 0 to 100) {
@@ -196,7 +196,7 @@ class LogTest {
deleteProducerSnapshotFiles()
// Reload after clean shutdown
- log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = logEndOffset)
+ log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
var expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).takeRight(2).toVector :+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
log.close()
@@ -205,7 +205,7 @@ class LogTest {
deleteProducerSnapshotFiles()
// Reload after unclean shutdown with recoveryPoint set to log end offset
- log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = logEndOffset)
+ log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
// Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case
expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset)
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
@@ -214,7 +214,7 @@ class LogTest {
deleteProducerSnapshotFiles()
// Reload after unclean shutdown with recoveryPoint set to 0
- log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = 0L)
+ log = createLog(logDir, logConfig, recoveryPoint = 0L)
// Is this working as intended?
expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
@@ -224,7 +224,7 @@ class LogTest {
@Test
def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
- var log = createLog(logDir, logConfig, brokerTopicStats)
+ var log = createLog(logDir, logConfig)
assertEquals(None, log.oldestProducerSnapshotOffset)
for (i <- 0 to 100) {
@@ -323,7 +323,7 @@ class LogTest {
@Test
def testProducerIdMapOffsetUpdatedForNonIdempotentData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
log.appendAsLeader(records, leaderEpoch = 0)
log.takeProducerSnapshot()
@@ -514,7 +514,7 @@ class LogTest {
@Test
def testRebuildProducerIdMapWithCompactedData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
val seq = 0
@@ -557,7 +557,7 @@ class LogTest {
@Test
def testRebuildProducerStateWithEmptyCompactedBatch() {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
val seq = 0
@@ -598,7 +598,7 @@ class LogTest {
@Test
def testUpdateProducerIdMapWithCompactedData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
val seq = 0
@@ -631,7 +631,7 @@ class LogTest {
@Test
def testProducerIdMapTruncateTo() {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0)
log.takeProducerSnapshot()
@@ -652,7 +652,7 @@ class LogTest {
def testProducerIdMapTruncateToWithNoSnapshots() {
// This ensures that the upgrade optimization path cannot be hit after initial loading
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid = 1L
val epoch = 0.toShort
@@ -676,7 +676,7 @@ class LogTest {
@Test
def testLoadProducersAfterDeleteRecordsMidSegment(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid1 = 1L
val pid2 = 2L
val epoch = 0.toShort
@@ -696,7 +696,7 @@ class LogTest {
log.close()
- val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, brokerTopicStats = brokerTopicStats)
+ val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
@@ -705,7 +705,7 @@ class LogTest {
@Test
def testLoadProducersAfterDeleteRecordsOnSegment(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid1 = 1L
val pid2 = 2L
val epoch = 0.toShort
@@ -731,7 +731,7 @@ class LogTest {
log.close()
- val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, brokerTopicStats = brokerTopicStats)
+ val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2)
assertEquals(retainedLastSeqOpt, reloadedEntryOpt)
@@ -741,7 +741,7 @@ class LogTest {
def testProducerIdMapTruncateFullyAndStartAt() {
val records = TestUtils.singletonRecords("foo".getBytes)
val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
log.appendAsLeader(records, leaderEpoch = 0)
log.takeProducerSnapshot()
@@ -764,7 +764,7 @@ class LogTest {
val pid1 = 1L
val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0)
val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
log.appendAsLeader(records, leaderEpoch = 0)
log.takeProducerSnapshot()
@@ -788,7 +788,7 @@ class LogTest {
@Test
def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() {
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
log.roll(1L)
assertEquals(Some(1L), log.latestProducerSnapshotOffset)
@@ -821,7 +821,7 @@ class LogTest {
def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = {
val producerId = 1L
val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
producerId = producerId, producerEpoch = 0, sequence = 0),
@@ -853,7 +853,7 @@ class LogTest {
@Test
def testRebuildTransactionalState(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid = 137L
val epoch = 5.toShort
@@ -874,7 +874,7 @@ class LogTest {
log.close()
- val reopenedLog = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val reopenedLog = createLog(logDir, logConfig)
reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
assertEquals(None, reopenedLog.firstUnstableOffset)
}
@@ -897,7 +897,7 @@ class LogTest {
val pid = 23L
val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = maxProducerIdExpirationMs,
- producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, brokerTopicStats = brokerTopicStats)
+ producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs)
val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes))
log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0)
@@ -913,7 +913,7 @@ class LogTest {
@Test
def testDuplicateAppends(): Unit = {
// create a log
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
val pid = 1L
val epoch: Short = 0
@@ -987,7 +987,7 @@ class LogTest {
@Test
def testMultipleProducerIdsPerMemoryRecord() : Unit = {
// create a log
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
val epoch: Short = 0
val buffer = ByteBuffer.allocate(512)
@@ -1033,7 +1033,7 @@ class LogTest {
@Test
def testDuplicateAppendToFollower() : Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val epoch: Short = 0
val pid = 1L
val baseSequence = 0
@@ -1054,7 +1054,7 @@ class LogTest {
@Test
def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid1 = 1L
val pid2 = 2L
@@ -1106,7 +1106,7 @@ class LogTest {
@Test(expected = classOf[ProducerFencedException])
def testOldProducerEpoch(): Unit = {
// create a log
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
val pid = 1L
val newEpoch: Short = 1
val oldEpoch: Short = 0
@@ -1128,7 +1128,7 @@ class LogTest {
val maxJitter = 20 * 60L
// create a log
val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
log.appendAsLeader(set, leaderEpoch = 0)
@@ -1153,7 +1153,7 @@ class LogTest {
val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@@ -1168,7 +1168,7 @@ class LogTest {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0)
}
@@ -1178,7 +1178,7 @@ class LogTest {
@Test
def testAppendAndReadWithSequentialOffsets() {
val logConfig = LogTest.createLogConfig(segmentBytes = 71)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
for(value <- values)
@@ -1202,7 +1202,7 @@ class LogTest {
@Test
def testAppendAndReadWithNonSequentialOffsets() {
val logConfig = LogTest.createLogConfig(segmentBytes = 72)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1226,7 +1226,7 @@ class LogTest {
@Test
def testReadAtLogGap() {
val logConfig = LogTest.createLogConfig(segmentBytes = 300)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
@@ -1242,7 +1242,7 @@ class LogTest {
@Test(expected = classOf[KafkaStorageException])
def testLogRollAfterLogHandlerClosed() {
val logConfig = LogTest.createLogConfig()
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
log.closeHandlers()
log.roll(1)
}
@@ -1250,7 +1250,7 @@ class LogTest {
@Test
def testReadWithMinMessage() {
val logConfig = LogTest.createLogConfig(segmentBytes = 72)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1277,7 +1277,7 @@ class LogTest {
@Test
def testReadWithTooSmallMaxLength() {
val logConfig = LogTest.createLogConfig(segmentBytes = 72)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@@ -1311,7 +1311,7 @@ class LogTest {
createEmptyLogs(logDir, 1024)
// set up replica log starting with offset 1024 and with one message (at offset 1024)
val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
assertEquals("Reading at the log end offset should produce 0 byte read.", 0,
@@ -1343,7 +1343,7 @@ class LogTest {
def testLogRolls() {
/* create a multipart log with 100 messages */
val logConfig = LogTest.createLogConfig(segmentBytes = 100)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
timestamp = mockTime.milliseconds))
@@ -1381,7 +1381,7 @@ class LogTest {
def testCompressedMessages() {
/* this log should roll after every messageset */
val logConfig = LogTest.createLogConfig(segmentBytes = 110)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
@@ -1405,7 +1405,7 @@ class LogTest {
logDir.mkdirs()
// first test a log segment starting at 0
val logConfig = LogTest.createLogConfig(segmentBytes = 100, retentionMs = 0)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
for(i <- 0 until messagesToAppend)
log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0)
@@ -1439,7 +1439,7 @@ class LogTest {
// append messages to log
val configSegmentSize = messageSet.sizeInBytes - 1
val logConfig = LogTest.createLogConfig(segmentBytes = configSegmentSize)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
try {
log.appendAsLeader(messageSet, leaderEpoch = 0)
@@ -1464,7 +1464,7 @@ class LogTest {
val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage)
val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
try {
log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
@@ -1505,7 +1505,7 @@ class LogTest {
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
val logConfig = LogTest.createLogConfig(maxMessageBytes = maxMessageSize)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// should be able to append the small message
log.appendAsLeader(first, leaderEpoch = 0)
@@ -1527,7 +1527,7 @@ class LogTest {
val segmentSize = 7 * messageSize
val indexInterval = 3 * messageSize
val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = indexInterval, segmentIndexBytes = 4096)
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
for(i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
@@ -1554,12 +1554,12 @@ class LogTest {
assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
}
- log = createLog(logDir, logConfig, recoveryPoint = lastOffset, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig, recoveryPoint = lastOffset)
verifyRecoveredLog(log, lastOffset)
log.close()
// test recovery case
- log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig)
verifyRecoveredLog(log, lastOffset)
log.close()
}
@@ -1571,7 +1571,7 @@ class LogTest {
def testBuildTimeIndexWhenNotAssigningOffsets() {
val numMessages = 100
val logConfig = LogTest.createLogConfig(segmentBytes = 10000, indexIntervalBytes = 1)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val messages = (0 until numMessages).map { i =>
MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(mockTime.milliseconds + i, i.toString.getBytes()))
@@ -1591,7 +1591,7 @@ class LogTest {
// publish the messages and close the log
val numMessages = 200
val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
for(i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
val indexFiles = log.logSegments.map(_.offsetIndex.file)
@@ -1603,7 +1603,7 @@ class LogTest {
timeIndexFiles.foreach(_.delete())
// reopen the log
- log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
assertTrue("The index should have been rebuilt", log.logSegments.head.offsetIndex.entries > 0)
assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -1625,7 +1625,7 @@ class LogTest {
val numMessages = 200
val segmentSize = 200
val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = 1, messageFormatVersion = "0.9.0")
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
for (i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
@@ -1636,7 +1636,7 @@ class LogTest {
timeIndexFiles.foreach(file => Files.delete(file.toPath))
// The rebuilt time index should be empty
- log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1)
for (segment <- log.logSegments.init) {
assertEquals("The time index should be empty", 0, segment.timeIndex.entries)
assertEquals("The time index file size should be 0", 0, segment.timeIndex.file.length)
@@ -1651,7 +1651,7 @@ class LogTest {
// publish the messages and close the log
val numMessages = 200
val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
for(i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
val indexFiles = log.logSegments.map(_.offsetIndex.file)
@@ -1673,7 +1673,7 @@ class LogTest {
}
// reopen the log
- log = createLog(logDir, logConfig, recoveryPoint = 200L, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig, recoveryPoint = 200L)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages) {
assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -1697,7 +1697,7 @@ class LogTest {
// create a log
val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (_ <- 1 to msgPerSeg)
@@ -1749,7 +1749,7 @@ class LogTest {
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = setSize - 1)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@@ -1788,7 +1788,7 @@ class LogTest {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0)
@@ -1810,13 +1810,13 @@ class LogTest {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
// create a log
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000)
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
// add enough messages to roll over several segments then close and re-open and attempt to truncate
for (_ <- 0 until 100)
log.appendAsLeader(createRecords, leaderEpoch = 0)
log.close()
- log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig)
log.truncateTo(3)
assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -1831,7 +1831,7 @@ class LogTest {
val asyncDeleteMs = 1000
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000,
retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 100)
@@ -1864,7 +1864,7 @@ class LogTest {
def testOpenDeletesObsoleteFiles() {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 100)
@@ -1874,13 +1874,13 @@ class LogTest {
log.onHighWatermarkIncremented(log.logEndOffset)
log.deleteOldSegments()
log.close()
- log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig)
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
}
@Test
def testAppendMessageWithNullPayload() {
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
val head = log.readUncommitted(0, 4096, None).records.records.iterator.next()
assertEquals(0, head.offset)
@@ -1889,7 +1889,7 @@ class LogTest {
@Test
def testAppendWithOutOfOrderOffsetsThrowsException() {
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
val buffer = ByteBuffer.allocate(512)
@@ -1910,7 +1910,7 @@ class LogTest {
@Test
def testAppendBelowExpectedOffsetThrowsException() {
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
@@ -1957,7 +1957,7 @@ class LogTest {
@Test
def testAppendWithNoTimestamp(): Unit = {
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
}
@@ -1971,7 +1971,7 @@ class LogTest {
for (_ <- 0 until 10) {
// create a log and write some messages to it
logDir.mkdirs()
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
val numMessages = 50 + TestUtils.random.nextInt(50)
for (_ <- 0 until numMessages)
log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -2005,7 +2005,7 @@ class LogTest {
def testOverCompactedLogRecovery(): Unit = {
// append some messages to create some segments
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, 0, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
@@ -2038,7 +2038,7 @@ class LogTest {
def testOverCompactedLogRecoveryMultiRecord(): Unit = {
// append some messages to create some segments
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP, 0,
new SimpleRecord("v3".getBytes(), "k3".getBytes()),
@@ -2077,7 +2077,7 @@ class LogTest {
def testOverCompactedLogRecoveryMultiRecordV1(): Unit = {
// append some messages to create some segments
val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val set1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0, CompressionType.NONE,
new SimpleRecord("v1".getBytes(), "k1".getBytes()))
val set2 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP,
@@ -2292,7 +2292,7 @@ class LogTest {
assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
var recoveryPoint = 0L
// create a log and write some messages to it
- var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ var log = createLog(logDir, logConfig)
for (_ <- 0 until 100)
log.appendAsLeader(createRecords, leaderEpoch = 0)
log.close()
@@ -2300,7 +2300,7 @@ class LogTest {
// check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
// clean shutdown file exists.
recoveryPoint = log.logEndOffset
- log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ log = createLog(logDir, logConfig)
assertEquals(recoveryPoint, log.logEndOffset)
Utils.delete(cleanShutdownFile)
}
@@ -2461,7 +2461,7 @@ class LogTest {
def testDeleteOldSegments() {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 100)
@@ -2511,7 +2511,7 @@ class LogTest {
def testLogDeletionAfterClose() {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -2529,7 +2529,7 @@ class LogTest {
def testLogDeletionAfterDeleteRecords() {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
for (_ <- 0 until 15)
log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -2561,7 +2561,7 @@ class LogTest {
def shouldDeleteSizeBasedSegments() {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 15)
@@ -2576,7 +2576,7 @@ class LogTest {
def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 15)
@@ -2591,7 +2591,7 @@ class LogTest {
def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() {
def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 15)
@@ -2606,7 +2606,7 @@ class LogTest {
def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() {
def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 15)
@@ -2621,7 +2621,7 @@ class LogTest {
def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() {
def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 15)
@@ -2640,7 +2640,7 @@ class LogTest {
def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete")
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// append some messages to create some segments
for (_ <- 0 until 15)
@@ -2657,7 +2657,7 @@ class LogTest {
//Given this partition is on leader epoch 72
val epoch = 72
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
log.leaderEpochCache.assign(epoch, records.size)
//When appending messages as a leader (i.e. assignOffsets = true)
@@ -2689,7 +2689,7 @@ class LogTest {
recs
}
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
//When appending as follower (assignOffsets = false)
for (i <- records.indices)
@@ -2702,7 +2702,7 @@ class LogTest {
def shouldTruncateLeaderEpochsWhenDeletingSegments() {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val cache = epochCache(log)
// Given three segments of 5 messages each
@@ -2727,7 +2727,7 @@ class LogTest {
def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val cache = epochCache(log)
// Given three segments of 5 messages each
@@ -2752,7 +2752,7 @@ class LogTest {
def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val cache = epochCache(log)
//Given 2 segments, 10 messages per segment
@@ -2797,7 +2797,7 @@ class LogTest {
*/
@Test
def testLogRecoversForLeaderEpoch() {
- val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, LogConfig())
val leaderEpochCache = epochCache(log)
val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
log.appendAsFollower(records = firstBatch)
@@ -2819,7 +2819,7 @@ class LogTest {
log.close()
// reopen the log and recover from the beginning
- val recoveredLog = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+ val recoveredLog = createLog(logDir, LogConfig())
val recoveredLeaderEpochCache = epochCache(recoveredLog)
// epoch entries should be recovered
@@ -2848,7 +2848,7 @@ class LogTest {
def testFirstUnstableOffsetNoTransactionalData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("foo".getBytes),
@@ -2862,7 +2862,7 @@ class LogTest {
@Test
def testFirstUnstableOffsetWithTransactionalData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val pid = 137L
val epoch = 5.toShort
@@ -2900,7 +2900,7 @@ class LogTest {
@Test
def testTransactionIndexUpdated(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid1 = 1L
@@ -2941,7 +2941,7 @@ class LogTest {
@Test
def testFullTransactionIndexRecovery(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid1 = 1L
@@ -2984,7 +2984,7 @@ class LogTest {
log.close()
val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
- val reloadedLog = createLog(logDir, reloadedLogConfig, brokerTopicStats = brokerTopicStats)
+ val reloadedLog = createLog(logDir, reloadedLogConfig)
val abortedTransactions = allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
}
@@ -2992,7 +2992,7 @@ class LogTest {
@Test
def testRecoverOnlyLastSegment(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid1 = 1L
@@ -3035,7 +3035,7 @@ class LogTest {
log.close()
val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
- val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, brokerTopicStats = brokerTopicStats)
+ val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint)
val abortedTransactions = allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
}
@@ -3043,7 +3043,7 @@ class LogTest {
@Test
def testRecoverLastSegmentWithNoSnapshots(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid1 = 1L
@@ -3089,7 +3089,7 @@ class LogTest {
log.close()
val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
- val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, brokerTopicStats = brokerTopicStats)
+ val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint)
val abortedTransactions = allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
}
@@ -3098,7 +3098,7 @@ class LogTest {
def testTransactionIndexUpdatedThroughReplication(): Unit = {
val epoch = 0.toShort
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val buffer = ByteBuffer.allocate(2048)
val pid1 = 1L
@@ -3144,7 +3144,7 @@ class LogTest {
val pid = 1L
val epoch = 0.toShort
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val append = appendTransactionalAsLeader(log, pid, epoch)
@@ -3160,7 +3160,7 @@ class LogTest {
@Test
def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid = 1L
val appendPid = appendTransactionalAsLeader(log, pid, epoch)
@@ -3184,7 +3184,7 @@ class LogTest {
@Test
def testFirstUnstableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val epoch = 0.toShort
val pid = 1L
val appendPid = appendTransactionalAsLeader(log, pid, epoch)
@@ -3211,7 +3211,7 @@ class LogTest {
@Test
def testLastStableOffsetWithMixedProducerData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
// for convenience, both producers share the same epoch
val epoch = 5.toShort
@@ -3272,7 +3272,7 @@ class LogTest {
new SimpleRecord("c".getBytes))
val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes)
- val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+ val log = createLog(logDir, logConfig)
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
@@ -3487,7 +3487,7 @@ object LogTest {
createLogConfig(indexIntervalBytes = 1)
var log = createLog(logDir, logConfig, brokerTopicStats, scheduler, time)
- var inputRecords = ListBuffer[Record]()
+ val inputRecords = ListBuffer[Record]()
// References to files we want to "merge" to emulate offset overflow
val toMerge = ListBuffer[File]()
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index aa8236a..dd4f7e3 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -21,71 +21,56 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, Random}
-import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
import kafka.log.{Log, LogSegment}
-import kafka.utils.TestUtils._
-import kafka.utils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.network.SocketServer
+import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, ListOffsetRequest, ListOffsetResponse}
import org.easymock.{EasyMock, IAnswer}
import org.junit.Assert._
-import org.junit.{After, Before, Test}
-
-class LogOffsetTest extends ZooKeeperTestHarness {
- val random = new Random()
- var logDir: File = null
- var topicLogDir: File = null
- var server: KafkaServer = null
- var logSize: Int = 140
- var simpleConsumer: SimpleConsumer = null
- var time: Time = new MockTime()
-
- @Before
- override def setUp() {
- super.setUp()
- val config: Properties = createBrokerConfig(1)
- config.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
- val logDirPath = config.getProperty("log.dir")
- logDir = new File(logDirPath)
- time = new MockTime()
- server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
- simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "")
- }
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class LogOffsetTest extends BaseRequestTest {
+
+ private lazy val time = new MockTime
+
+ protected override def numBrokers = 1
- @After
- override def tearDown() {
- simpleConsumer.close
- TestUtils.shutdownServers(Seq(server))
- super.tearDown()
+ protected override def brokerTime(brokerId: Int) = time
+
+ protected override def propertyOverrides(props: Properties): Unit = {
+ props.put("log.flush.interval.messages", "1")
+ props.put("num.partitions", "20")
+ props.put("log.retention.hours", "10")
+ props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString)
+ props.put("log.segment.bytes", "140")
}
@Test
def testGetOffsetsForUnknownTopic() {
- val topicAndPartition = TopicAndPartition("foo", 0)
- val request = OffsetRequest(
- Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
- val offsetResponse = simpleConsumer.getOffsetsBefore(request)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
+ val topicPartition = new TopicPartition("foo", 0)
+ val request = ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .setOffsetData(Map(topicPartition ->
+ new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 10)).asJava).build(0)
+ val response = sendListOffsetsRequest(request)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData.get(topicPartition).error)
}
@Test
def testGetOffsetsAfterDeleteRecords() {
- val topicPartition = "kafka-" + 0
- val topic = topicPartition.split("-").head
- val part = Integer.valueOf(topicPartition.split("-").last).intValue
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, 0)
- // setup brokers in ZooKeeper as owners of partitions for this test
adminZkClient.createTopic(topic, 1, 1)
val logManager = server.getLogManager
- waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
+ TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
"Log for partition [topic,0] should be created")
- val log = logManager.getLog(new TopicPartition(topic, part)).get
+ val log = logManager.getLog(topicPartition).get
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -95,93 +80,87 @@ class LogOffsetTest extends ZooKeeperTestHarness {
log.maybeIncrementLogStartOffset(3)
log.deleteOldSegments()
- val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+ val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets)
- waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
- val topicAndPartition = TopicAndPartition(topic, part)
- val offsetRequest = OffsetRequest(
- Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
- replicaId = 0)
- val consumerOffsets =
- simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+ "Leader should be elected")
+ val request = ListOffsetRequest.Builder.forReplica(0, 0)
+ .setOffsetData(Map(topicPartition ->
+ new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build()
+ val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
}
@Test
def testGetOffsetsBeforeLatestTime() {
- val topicPartition = "kafka-" + 0
- val topic = topicPartition.split("-").head
- val part = Integer.valueOf(topicPartition.split("-").last).intValue
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, 0)
- // setup brokers in ZooKeeper as owners of partitions for this test
adminZkClient.createTopic(topic, 1, 1)
val logManager = server.getLogManager
- waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
- "Log for partition [topic,0] should be created")
- val log = logManager.getLog(new TopicPartition(topic, part)).get
+ TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+ s"Log for partition $topicPartition should be created")
+ val log = logManager.getLog(topicPartition).get
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
log.flush()
- val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+ val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
- waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
- val topicAndPartition = TopicAndPartition(topic, part)
- val offsetRequest = OffsetRequest(
- Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
- replicaId = 0)
- val consumerOffsets =
- simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+ "Leader should be elected")
+ val request = ListOffsetRequest.Builder.forReplica(0, 0)
+ .setOffsetData(Map(topicPartition ->
+ new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build()
+ val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
// try to fetch using latest offset
- val fetchResponse = simpleConsumer.fetch(
- new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
- assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
+ val fetchRequest = FetchRequest.Builder.forConsumer(0, 1,
+ Map(topicPartition -> new FetchRequest.PartitionData(consumerOffsets.head, FetchRequest.INVALID_LOG_START_OFFSET,
+ 300 * 1024)).asJava).build()
+ val fetchResponse = sendFetchRequest(fetchRequest)
+ assertFalse(fetchResponse.responseData.get(topicPartition).records.batches.iterator.hasNext)
}
@Test
def testEmptyLogsGetOffsets() {
- val topicPartition = "kafka-" + random.nextInt(10)
- val topicPartitionPath = TestUtils.tempDir().getAbsolutePath + "/" + topicPartition
- topicLogDir = new File(topicPartitionPath)
+ val random = new Random
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, random.nextInt(10))
+ val topicPartitionPath = s"${TestUtils.tempDir().getAbsolutePath}/$topic-${topicPartition.partition}"
+ val topicLogDir = new File(topicPartitionPath)
topicLogDir.mkdir()
- val topic = topicPartition.split("-").head
-
- // setup brokers in ZooKeeper as owners of partitions for this test
- createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
+ createTopic(topic, numPartitions = 1, replicationFactor = 1)
var offsetChanged = false
for (_ <- 1 to 14) {
- val topicAndPartition = TopicAndPartition(topic, 0)
- val offsetRequest =
- OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
- val consumerOffsets =
- simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-
- if(consumerOffsets.head == 1) {
+ val topicPartition = new TopicPartition(topic, 0)
+ val request = ListOffsetRequest.Builder.forReplica(0, 0)
+ .setOffsetData(Map(topicPartition ->
+ new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 1)).asJava).build()
+ val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
+ if (consumerOffsets.head == 1)
offsetChanged = true
- }
}
assertFalse(offsetChanged)
}
@Test
def testGetOffsetsBeforeNow() {
- val topicPartition = "kafka-" + random.nextInt(3)
- val topic = topicPartition.split("-").head
- val part = Integer.valueOf(topicPartition.split("-").last).intValue
+ val random = new Random
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, random.nextInt(3))
- // setup brokers in ZooKeeper as owners of partitions for this test
adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
- val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.initialDefaultConfig)
+ val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -189,42 +168,42 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
- val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), now, 15)
+ val offsets = server.apis.fetchOffsets(logManager, topicPartition, now, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
- waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
- val topicAndPartition = TopicAndPartition(topic, part)
- val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 15)), replicaId = 0)
- val consumerOffsets =
- simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+ "Leader should be elected")
+ val request = ListOffsetRequest.Builder.forReplica(0, 0)
+ .setOffsetData(Map(topicPartition ->
+ new ListOffsetRequest.PartitionData(now, 15)).asJava).build()
+ val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
}
@Test
def testGetOffsetsBeforeEarliestTime() {
- val topicPartition = "kafka-" + random.nextInt(3)
- val topic = topicPartition.split("-").head
- val part = Integer.valueOf(topicPartition.split("-").last).intValue
+ val random = new Random
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, random.nextInt(3))
- // setup brokers in ZooKeeper as owners of partitions for this test
adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
- val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.initialDefaultConfig)
+ val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
log.flush()
- val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime, 10)
+ val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, 10)
assertEquals(Seq(0L), offsets)
- waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
- val topicAndPartition = TopicAndPartition(topic, part)
- val offsetRequest =
- OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
- val consumerOffsets =
- simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+ "Leader should be elected")
+ val request = ListOffsetRequest.Builder.forReplica(0, 0)
+ .setOffsetData(Map(topicPartition ->
+ new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 10)).asJava).build()
+ val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
assertEquals(Seq(0L), consumerOffsets)
}
@@ -264,19 +243,16 @@ class LogOffsetTest extends ZooKeeperTestHarness {
server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100)
}
- private def createBrokerConfig(nodeId: Int): Properties = {
- val props = new Properties
- props.put("broker.id", nodeId.toString)
- props.put("port", TestUtils.RandomPort.toString())
- props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
- props.put("log.flush.interval.messages", "1")
- props.put("enable.zookeeper", "false")
- props.put("num.partitions", "20")
- props.put("log.retention.hours", "10")
- props.put("log.retention.check.interval.ms", (5*1000*60).toString)
- props.put("log.segment.bytes", logSize.toString)
- props.put("zookeeper.connect", zkConnect.toString)
- props
+ private def server: KafkaServer = servers.head
+
+ private def sendListOffsetsRequest(request: ListOffsetRequest, destination: Option[SocketServer] = None): ListOffsetResponse = {
+ val response = connectAndSend(request, ApiKeys.LIST_OFFSETS, destination = destination.getOrElse(anySocketServer))
+ ListOffsetResponse.parse(response, request.version)
+ }
+
+ private def sendFetchRequest(request: FetchRequest, destination: Option[SocketServer] = None): FetchResponse[MemoryRecords] = {
+ val response = connectAndSend(request, ApiKeys.FETCH, destination = destination.getOrElse(anySocketServer))
+ FetchResponse.parse(response, request.version)
}
}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.