You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/14 18:41:39 UTC

[kafka] branch 2.0 updated: MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 040eab6  MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227)
040eab6 is described below

commit 040eab6f13beb356b35afe2bd5be8e61fee2cb3a
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Jun 14 11:38:00 2018 -0700

    MINOR: Use ListOffsets request instead of SimpleConsumer in LogOffsetTest (#5227)
    
    Included a few clean-ups related to unused variables in tests.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../kafka/api/LegacyAdminClientTest.scala          |  14 +-
 .../kafka/integration/KafkaServerTestHarness.scala |   4 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 212 +++++++++----------
 .../scala/unit/kafka/server/LogOffsetTest.scala    | 224 +++++++++------------
 4 files changed, 210 insertions(+), 244 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index b78946c..115ec05 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -24,7 +24,7 @@ import java.lang.{Long => JLong}
 
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Test}
@@ -147,16 +147,4 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
     }, "Expected non-empty assignment")
   }
 
-  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
-                          numRecords: Int,
-                          tp: TopicPartition) {
-    val futures = (0 until numRecords).map { i =>
-      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
-      debug(s"Sending this record: $record")
-      producer.send(record)
-    }
-
-    futures.foreach(_.get)
-  }
-
 }
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 662d6d2..2c4a988 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -31,6 +31,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer}
 import java.util.Properties
 
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.utils.Time
 
 /**
  * A test harness that brings up some number of broker nodes
@@ -82,6 +83,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
   protected def trustStoreFile: Option[File] = None
   protected def serverSaslProperties: Option[Properties] = None
   protected def clientSaslProperties: Option[Properties] = None
+  protected def brokerTime(brokerId: Int): Time = Time.SYSTEM
 
   @Before
   override def setUp() {
@@ -96,7 +98,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
     // Add each broker to `servers` buffer as soon as it is created to ensure that brokers
     // are shutdown cleanly in tearDown even if a subsequent broker fails to start
     for (config <- configs)
-      servers += TestUtils.createServer(config)
+      servers += TestUtils.createServer(config, time = brokerTime(config.brokerId))
     brokerList = TestUtils.bootstrapServers(servers, listenerName)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 6c62e5e..550b929 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -98,7 +98,7 @@ class LogTest {
     val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
 
     // create a log
-    val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     // Test the segment rolling behavior when messages do not have a timestamp.
     mockTime.sleep(log.config.segmentMs + 1)
@@ -143,7 +143,7 @@ class LogTest {
   @Test(expected = classOf[OutOfOrderSequenceException])
   def testNonSequentialAppend(): Unit = {
     // create a log
-    val log = createLog(logDir, LogConfig(), brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     val pid = 1L
     val epoch: Short = 0
 
@@ -156,7 +156,7 @@ class LogTest {
 
   @Test
   def testTruncateToEmptySegment(): Unit = {
-    val log = createLog(logDir, LogConfig(), brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
 
     // Force a segment roll by using a large offset. The first segment will be empty
     val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
@@ -181,7 +181,7 @@ class LogTest {
     // simulate the upgrade path by creating a new log with several segments, deleting the
     // snapshot files, and then reloading the log
     val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
-    var log = createLog(logDir, logConfig, brokerTopicStats)
+    var log = createLog(logDir, logConfig)
     assertEquals(None, log.oldestProducerSnapshotOffset)
 
     for (i <- 0 to 100) {
@@ -196,7 +196,7 @@ class LogTest {
     deleteProducerSnapshotFiles()
 
     // Reload after clean shutdown
-    log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = logEndOffset)
+    log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
     var expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).takeRight(2).toVector :+ log.logEndOffset
     assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
     log.close()
@@ -205,7 +205,7 @@ class LogTest {
     deleteProducerSnapshotFiles()
 
     // Reload after unclean shutdown with recoveryPoint set to log end offset
-    log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = logEndOffset)
+    log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
     // Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case
     expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset)
     assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
@@ -214,7 +214,7 @@ class LogTest {
     deleteProducerSnapshotFiles()
 
     // Reload after unclean shutdown with recoveryPoint set to 0
-    log = createLog(logDir, logConfig, brokerTopicStats, recoveryPoint = 0L)
+    log = createLog(logDir, logConfig, recoveryPoint = 0L)
     // Is this working as intended?
     expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset
     assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
@@ -224,7 +224,7 @@ class LogTest {
   @Test
   def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
-    var log = createLog(logDir, logConfig, brokerTopicStats)
+    var log = createLog(logDir, logConfig)
     assertEquals(None, log.oldestProducerSnapshotOffset)
 
     for (i <- 0 to 100) {
@@ -323,7 +323,7 @@ class LogTest {
   @Test
   def testProducerIdMapOffsetUpdatedForNonIdempotentData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
@@ -514,7 +514,7 @@ class LogTest {
   @Test
   def testRebuildProducerIdMapWithCompactedData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
     val seq = 0
@@ -557,7 +557,7 @@ class LogTest {
   @Test
   def testRebuildProducerStateWithEmptyCompactedBatch() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
     val seq = 0
@@ -598,7 +598,7 @@ class LogTest {
   @Test
   def testUpdateProducerIdMapWithCompactedData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
     val seq = 0
@@ -631,7 +631,7 @@ class LogTest {
   @Test
   def testProducerIdMapTruncateTo() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0)
     log.takeProducerSnapshot()
@@ -652,7 +652,7 @@ class LogTest {
   def testProducerIdMapTruncateToWithNoSnapshots() {
     // This ensures that the upgrade optimization path cannot be hit after initial loading
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
 
@@ -676,7 +676,7 @@ class LogTest {
   @Test
   def testLoadProducersAfterDeleteRecordsMidSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val pid2 = 2L
     val epoch = 0.toShort
@@ -696,7 +696,7 @@ class LogTest {
 
     log.close()
 
-    val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, brokerTopicStats = brokerTopicStats)
+    val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
     assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
     val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
     assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
@@ -705,7 +705,7 @@ class LogTest {
   @Test
   def testLoadProducersAfterDeleteRecordsOnSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val pid2 = 2L
     val epoch = 0.toShort
@@ -731,7 +731,7 @@ class LogTest {
 
     log.close()
 
-    val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L, brokerTopicStats = brokerTopicStats)
+    val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
     assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
     val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2)
     assertEquals(retainedLastSeqOpt, reloadedEntryOpt)
@@ -741,7 +741,7 @@ class LogTest {
   def testProducerIdMapTruncateFullyAndStartAt() {
     val records = TestUtils.singletonRecords("foo".getBytes)
     val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
 
@@ -764,7 +764,7 @@ class LogTest {
     val pid1 = 1L
     val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0)
     val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
 
@@ -788,7 +788,7 @@ class LogTest {
   @Test
   def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
     log.roll(1L)
     assertEquals(Some(1L), log.latestProducerSnapshotOffset)
@@ -821,7 +821,7 @@ class LogTest {
   def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = {
     val producerId = 1L
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
       producerId = producerId, producerEpoch = 0, sequence = 0),
@@ -853,7 +853,7 @@ class LogTest {
   @Test
   def testRebuildTransactionalState(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     val pid = 137L
     val epoch = 5.toShort
@@ -874,7 +874,7 @@ class LogTest {
 
     log.close()
 
-    val reopenedLog = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val reopenedLog = createLog(logDir, logConfig)
     reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
@@ -897,7 +897,7 @@ class LogTest {
     val pid = 23L
     val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5)
     val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = maxProducerIdExpirationMs,
-      producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, brokerTopicStats = brokerTopicStats)
+      producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs)
     val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes))
     log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0)
 
@@ -913,7 +913,7 @@ class LogTest {
   @Test
   def testDuplicateAppends(): Unit = {
     // create a log
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     val pid = 1L
     val epoch: Short = 0
 
@@ -987,7 +987,7 @@ class LogTest {
   @Test
   def testMultipleProducerIdsPerMemoryRecord() : Unit = {
     // create a log
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
 
     val epoch: Short = 0
     val buffer = ByteBuffer.allocate(512)
@@ -1033,7 +1033,7 @@ class LogTest {
   @Test
   def testDuplicateAppendToFollower() : Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val epoch: Short = 0
     val pid = 1L
     val baseSequence = 0
@@ -1054,7 +1054,7 @@ class LogTest {
   @Test
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     val pid1 = 1L
     val pid2 = 2L
@@ -1106,7 +1106,7 @@ class LogTest {
   @Test(expected = classOf[ProducerFencedException])
   def testOldProducerEpoch(): Unit = {
     // create a log
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     val pid = 1L
     val newEpoch: Short = 1
     val oldEpoch: Short = 0
@@ -1128,7 +1128,7 @@ class LogTest {
     val maxJitter = 20 * 60L
     // create a log
     val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     log.appendAsLeader(set, leaderEpoch = 0)
 
@@ -1153,7 +1153,7 @@ class LogTest {
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
     // create a log
     val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -1168,7 +1168,7 @@ class LogTest {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0)
   }
 
@@ -1178,7 +1178,7 @@ class LogTest {
   @Test
   def testAppendAndReadWithSequentialOffsets() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 71)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -1202,7 +1202,7 @@ class LogTest {
   @Test
   def testAppendAndReadWithNonSequentialOffsets() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 72)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -1226,7 +1226,7 @@ class LogTest {
   @Test
   def testReadAtLogGap() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 300)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -1242,7 +1242,7 @@ class LogTest {
   @Test(expected = classOf[KafkaStorageException])
   def testLogRollAfterLogHandlerClosed() {
     val logConfig = LogTest.createLogConfig()
-    val log = createLog(logDir,  logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir,  logConfig)
     log.closeHandlers()
     log.roll(1)
   }
@@ -1250,7 +1250,7 @@ class LogTest {
   @Test
   def testReadWithMinMessage() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 72)
-    val log = createLog(logDir,  logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir,  logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -1277,7 +1277,7 @@ class LogTest {
   @Test
   def testReadWithTooSmallMaxLength() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 72)
-    val log = createLog(logDir,  logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir,  logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -1311,7 +1311,7 @@ class LogTest {
     createEmptyLogs(logDir, 1024)
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0,
@@ -1343,7 +1343,7 @@ class LogTest {
   def testLogRolls() {
     /* create a multipart log with 100 messages */
     val logConfig = LogTest.createLogConfig(segmentBytes = 100)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                 timestamp = mockTime.milliseconds))
@@ -1381,7 +1381,7 @@ class LogTest {
   def testCompressedMessages() {
     /* this log should roll after every messageset */
     val logConfig = LogTest.createLogConfig(segmentBytes = 110)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
@@ -1405,7 +1405,7 @@ class LogTest {
       logDir.mkdirs()
       // first test a log segment starting at 0
       val logConfig = LogTest.createLogConfig(segmentBytes = 100, retentionMs = 0)
-      val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+      val log = createLog(logDir, logConfig)
       for(i <- 0 until messagesToAppend)
         log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0)
 
@@ -1439,7 +1439,7 @@ class LogTest {
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
     val logConfig = LogTest.createLogConfig(segmentBytes = configSegmentSize)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     try {
       log.appendAsLeader(messageSet, leaderEpoch = 0)
@@ -1464,7 +1464,7 @@ class LogTest {
     val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage)
 
     val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     try {
       log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
@@ -1505,7 +1505,7 @@ class LogTest {
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
     val logConfig = LogTest.createLogConfig(maxMessageBytes = maxMessageSize)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // should be able to append the small message
     log.appendAsLeader(first, leaderEpoch = 0)
@@ -1527,7 +1527,7 @@ class LogTest {
     val segmentSize = 7 * messageSize
     val indexInterval = 3 * messageSize
     val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = indexInterval, segmentIndexBytes = 4096)
-    var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
@@ -1554,12 +1554,12 @@ class LogTest {
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = createLog(logDir, logConfig, recoveryPoint = lastOffset, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig, recoveryPoint = lastOffset)
     verifyRecoveredLog(log, lastOffset)
     log.close()
 
     // test recovery case
-    log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig)
     verifyRecoveredLog(log, lastOffset)
     log.close()
   }
@@ -1571,7 +1571,7 @@ class LogTest {
   def testBuildTimeIndexWhenNotAssigningOffsets() {
     val numMessages = 100
     val logConfig = LogTest.createLogConfig(segmentBytes = 10000, indexIntervalBytes = 1)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(mockTime.milliseconds + i, i.toString.getBytes()))
@@ -1591,7 +1591,7 @@ class LogTest {
     // publish the messages and close the log
     val numMessages = 200
     val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
-    var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.offsetIndex.file)
@@ -1603,7 +1603,7 @@ class LogTest {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", log.logSegments.head.offsetIndex.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -1625,7 +1625,7 @@ class LogTest {
     val numMessages = 200
     val segmentSize = 200
     val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = 1, messageFormatVersion = "0.9.0")
-    var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    var log = createLog(logDir, logConfig)
     for (i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
         timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
@@ -1636,7 +1636,7 @@ class LogTest {
     timeIndexFiles.foreach(file => Files.delete(file.toPath))
 
     // The rebuilt time index should be empty
-    log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1)
     for (segment <- log.logSegments.init) {
       assertEquals("The time index should be empty", 0, segment.timeIndex.entries)
       assertEquals("The time index file size should be 0", 0, segment.timeIndex.file.length)
@@ -1651,7 +1651,7 @@ class LogTest {
     // publish the messages and close the log
     val numMessages = 200
     val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
-    var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.offsetIndex.file)
@@ -1673,7 +1673,7 @@ class LogTest {
     }
 
     // reopen the log
-    log = createLog(logDir, logConfig, recoveryPoint = 200L, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig, recoveryPoint = 200L)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -1697,7 +1697,7 @@ class LogTest {
 
     // create a log
     val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (_ <- 1 to msgPerSeg)
@@ -1749,7 +1749,7 @@ class LogTest {
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
     val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = setSize - 1)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -1788,7 +1788,7 @@ class LogTest {
 
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
     assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0)
@@ -1810,13 +1810,13 @@ class LogTest {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     // create a log
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000)
-    var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    var log = createLog(logDir, logConfig)
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for (_ <- 0 until 100)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
     log.close()
-    log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -1831,7 +1831,7 @@ class LogTest {
     val asyncDeleteMs = 1000
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000,
                                     retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1864,7 +1864,7 @@ class LogTest {
   def testOpenDeletesObsoleteFiles() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
-    var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    var log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1874,13 +1874,13 @@ class LogTest {
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     log.close()
-    log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
 
   @Test
   def testAppendMessageWithNullPayload() {
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
     val head = log.readUncommitted(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
@@ -1889,7 +1889,7 @@ class LogTest {
 
   @Test
   def testAppendWithOutOfOrderOffsetsThrowsException() {
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
 
     val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
     val buffer = ByteBuffer.allocate(512)
@@ -1910,7 +1910,7 @@ class LogTest {
 
   @Test
   def testAppendBelowExpectedOffsetThrowsException() {
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
 
@@ -1957,7 +1957,7 @@ class LogTest {
 
   @Test
   def testAppendWithNoTimestamp(): Unit = {
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
   }
@@ -1971,7 +1971,7 @@ class LogTest {
     for (_ <- 0 until 10) {
       // create a log and write some messages to it
       logDir.mkdirs()
-      var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+      var log = createLog(logDir, logConfig)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -2005,7 +2005,7 @@ class LogTest {
   def testOverCompactedLogRecovery(): Unit = {
     // append some messages to create some segments
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
     val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, 0, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
@@ -2038,7 +2038,7 @@ class LogTest {
   def testOverCompactedLogRecoveryMultiRecord(): Unit = {
     // append some messages to create some segments
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP, 0,
       new SimpleRecord("v3".getBytes(), "k3".getBytes()),
@@ -2077,7 +2077,7 @@ class LogTest {
   def testOverCompactedLogRecoveryMultiRecordV1(): Unit = {
     // append some messages to create some segments
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val set1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0, CompressionType.NONE,
       new SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP,
@@ -2292,7 +2292,7 @@ class LogTest {
     assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
     var recoveryPoint = 0L
     // create a log and write some messages to it
-    var log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    var log = createLog(logDir, logConfig)
     for (_ <- 0 until 100)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
     log.close()
@@ -2300,7 +2300,7 @@ class LogTest {
     // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
     // clean shutdown file exists.
     recoveryPoint = log.logEndOffset
-    log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    log = createLog(logDir, logConfig)
     assertEquals(recoveryPoint, log.logEndOffset)
     Utils.delete(cleanShutdownFile)
   }
@@ -2461,7 +2461,7 @@ class LogTest {
   def testDeleteOldSegments() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -2511,7 +2511,7 @@ class LogTest {
   def testLogDeletionAfterClose() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -2529,7 +2529,7 @@ class LogTest {
   def testLogDeletionAfterDeleteRecords() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -2561,7 +2561,7 @@ class LogTest {
   def shouldDeleteSizeBasedSegments() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2576,7 +2576,7 @@ class LogTest {
   def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2591,7 +2591,7 @@ class LogTest {
   def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() {
     def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2606,7 +2606,7 @@ class LogTest {
   def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() {
     def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2621,7 +2621,7 @@ class LogTest {
   def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2640,7 +2640,7 @@ class LogTest {
   def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete")
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2657,7 +2657,7 @@ class LogTest {
 
     //Given this partition is on leader epoch 72
     val epoch = 72
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     log.leaderEpochCache.assign(epoch, records.size)
 
     //When appending messages as a leader (i.e. assignOffsets = true)
@@ -2689,7 +2689,7 @@ class LogTest {
       recs
     }
 
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
 
     //When appending as follower (assignOffsets = false)
     for (i <- records.indices)
@@ -2702,7 +2702,7 @@ class LogTest {
   def shouldTruncateLeaderEpochsWhenDeletingSegments() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
     // Given three segments of 5 messages each
@@ -2727,7 +2727,7 @@ class LogTest {
   def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
     val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
     // Given three segments of 5 messages each
@@ -2752,7 +2752,7 @@ class LogTest {
   def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
     //Given 2 segments, 10 messages per segment
@@ -2797,7 +2797,7 @@ class LogTest {
    */
   @Test
   def testLogRecoversForLeaderEpoch() {
-    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, LogConfig())
     val leaderEpochCache = epochCache(log)
     val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
     log.appendAsFollower(records = firstBatch)
@@ -2819,7 +2819,7 @@ class LogTest {
     log.close()
 
     // reopen the log and recover from the beginning
-    val recoveredLog = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    val recoveredLog = createLog(logDir, LogConfig())
     val recoveredLeaderEpochCache = epochCache(recoveredLog)
 
     // epoch entries should be recovered
@@ -2848,7 +2848,7 @@ class LogTest {
 
   def testFirstUnstableOffsetNoTransactionalData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     val records = MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord("foo".getBytes),
@@ -2862,7 +2862,7 @@ class LogTest {
   @Test
   def testFirstUnstableOffsetWithTransactionalData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     val pid = 137L
     val epoch = 5.toShort
@@ -2900,7 +2900,7 @@ class LogTest {
   @Test
   def testTransactionIndexUpdated(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -2941,7 +2941,7 @@ class LogTest {
   @Test
   def testFullTransactionIndexRecovery(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -2984,7 +2984,7 @@ class LogTest {
     log.close()
 
     val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
-    val reloadedLog = createLog(logDir, reloadedLogConfig, brokerTopicStats = brokerTopicStats)
+    val reloadedLog = createLog(logDir, reloadedLogConfig)
     val abortedTransactions = allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
   }
@@ -2992,7 +2992,7 @@ class LogTest {
   @Test
   def testRecoverOnlyLastSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -3035,7 +3035,7 @@ class LogTest {
     log.close()
 
     val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
-    val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, brokerTopicStats = brokerTopicStats)
+    val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint)
     val abortedTransactions = allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
   }
@@ -3043,7 +3043,7 @@ class LogTest {
   @Test
   def testRecoverLastSegmentWithNoSnapshots(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -3089,7 +3089,7 @@ class LogTest {
     log.close()
 
     val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5)
-    val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint, brokerTopicStats = brokerTopicStats)
+    val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint)
     val abortedTransactions = allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
   }
@@ -3098,7 +3098,7 @@ class LogTest {
   def testTransactionIndexUpdatedThroughReplication(): Unit = {
     val epoch = 0.toShort
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val buffer = ByteBuffer.allocate(2048)
 
     val pid1 = 1L
@@ -3144,7 +3144,7 @@ class LogTest {
     val pid = 1L
     val epoch = 0.toShort
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     val append = appendTransactionalAsLeader(log, pid, epoch)
 
@@ -3160,7 +3160,7 @@ class LogTest {
   @Test
   def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
     val pid = 1L
     val appendPid = appendTransactionalAsLeader(log, pid, epoch)
@@ -3184,7 +3184,7 @@ class LogTest {
   @Test
   def testFirstUnstableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
     val pid = 1L
     val appendPid = appendTransactionalAsLeader(log, pid, epoch)
@@ -3211,7 +3211,7 @@ class LogTest {
   @Test
   def testLastStableOffsetWithMixedProducerData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     // for convenience, both producers share the same epoch
     val epoch = 5.toShort
@@ -3272,7 +3272,7 @@ class LogTest {
       new SimpleRecord("c".getBytes))
 
     val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes)
-    val log = createLog(logDir, logConfig, brokerTopicStats = brokerTopicStats)
+    val log = createLog(logDir, logConfig)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
     assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
@@ -3487,7 +3487,7 @@ object LogTest {
         createLogConfig(indexIntervalBytes = 1)
 
     var log = createLog(logDir, logConfig, brokerTopicStats, scheduler, time)
-    var inputRecords = ListBuffer[Record]()
+    val inputRecords = ListBuffer[Record]()
 
     // References to files we want to "merge" to emulate offset overflow
     val toMerge = ListBuffer[File]()
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index aa8236a..dd4f7e3 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -21,71 +21,56 @@ import java.io.File
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Properties, Random}
 
-import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
 import kafka.log.{Log, LogSegment}
-import kafka.utils.TestUtils._
-import kafka.utils._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.network.SocketServer
+import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, ListOffsetRequest, ListOffsetResponse}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
-import org.junit.{After, Before, Test}
-
-class LogOffsetTest extends ZooKeeperTestHarness {
-  val random = new Random()
-  var logDir: File = null
-  var topicLogDir: File = null
-  var server: KafkaServer = null
-  var logSize: Int = 140
-  var simpleConsumer: SimpleConsumer = null
-  var time: Time = new MockTime()
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    val config: Properties = createBrokerConfig(1)
-    config.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
-    val logDirPath = config.getProperty("log.dir")
-    logDir = new File(logDirPath)
-    time = new MockTime()
-    server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
-    simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "")
-  }
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class LogOffsetTest extends BaseRequestTest {
+
+  private lazy val time = new MockTime
+
+  protected override def numBrokers = 1
 
-  @After
-  override def tearDown() {
-    simpleConsumer.close
-    TestUtils.shutdownServers(Seq(server))
-    super.tearDown()
+  protected override def brokerTime(brokerId: Int) = time
+
+  protected override def propertyOverrides(props: Properties): Unit = {
+    props.put("log.flush.interval.messages", "1")
+    props.put("num.partitions", "20")
+    props.put("log.retention.hours", "10")
+    props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString)
+    props.put("log.segment.bytes", "140")
   }
 
   @Test
   def testGetOffsetsForUnknownTopic() {
-    val topicAndPartition = TopicAndPartition("foo", 0)
-    val request = OffsetRequest(
-      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
-    val offsetResponse = simpleConsumer.getOffsetsBefore(request)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-                 offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
+    val topicPartition = new TopicPartition("foo", 0)
+    val request = ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+      .setOffsetData(Map(topicPartition ->
+        new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 10)).asJava).build(0)
+    val response = sendListOffsetsRequest(request)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData.get(topicPartition).error)
   }
 
   @Test
   def testGetOffsetsAfterDeleteRecords() {
-    val topicPartition = "kafka-" + 0
-    val topic = topicPartition.split("-").head
-    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
 
-    // setup brokers in ZooKeeper as owners of partitions for this test
     adminZkClient.createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
-    waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
                   "Log for partition [topic,0] should be created")
-    val log = logManager.getLog(new TopicPartition(topic, part)).get
+    val log = logManager.getLog(topicPartition).get
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -95,93 +80,87 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     log.maybeIncrementLogStartOffset(3)
     log.deleteOldSegments()
 
-    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+    val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets)
 
-    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
-    val topicAndPartition = TopicAndPartition(topic, part)
-    val offsetRequest = OffsetRequest(
-      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
-      replicaId = 0)
-    val consumerOffsets =
-      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+      "Leader should be elected")
+    val request = ListOffsetRequest.Builder.forReplica(0, 0)
+      .setOffsetData(Map(topicPartition ->
+        new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build()
+    val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
   }
 
   @Test
   def testGetOffsetsBeforeLatestTime() {
-    val topicPartition = "kafka-" + 0
-    val topic = topicPartition.split("-").head
-    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
 
-    // setup brokers in ZooKeeper as owners of partitions for this test
     adminZkClient.createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
-    waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
-      "Log for partition [topic,0] should be created")
-    val log = logManager.getLog(new TopicPartition(topic, part)).get
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      s"Log for partition $topicPartition should be created")
+    val log = logManager.getLog(topicPartition).get
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush()
 
-    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+    val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 
-    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
-    val topicAndPartition = TopicAndPartition(topic, part)
-    val offsetRequest = OffsetRequest(
-      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
-      replicaId = 0)
-    val consumerOffsets =
-      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+      "Leader should be elected")
+    val request = ListOffsetRequest.Builder.forReplica(0, 0)
+      .setOffsetData(Map(topicPartition ->
+        new ListOffsetRequest.PartitionData(ListOffsetRequest.LATEST_TIMESTAMP, 15)).asJava).build()
+    val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
 
     // try to fetch using latest offset
-    val fetchResponse = simpleConsumer.fetch(
-      new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
-    assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
+    val fetchRequest = FetchRequest.Builder.forConsumer(0, 1,
+      Map(topicPartition -> new FetchRequest.PartitionData(consumerOffsets.head, FetchRequest.INVALID_LOG_START_OFFSET,
+        300 * 1024)).asJava).build()
+    val fetchResponse = sendFetchRequest(fetchRequest)
+    assertFalse(fetchResponse.responseData.get(topicPartition).records.batches.iterator.hasNext)
   }
 
   @Test
   def testEmptyLogsGetOffsets() {
-    val topicPartition = "kafka-" + random.nextInt(10)
-    val topicPartitionPath = TestUtils.tempDir().getAbsolutePath + "/" + topicPartition
-    topicLogDir = new File(topicPartitionPath)
+    val random = new Random
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, random.nextInt(10))
+    val topicPartitionPath = s"${TestUtils.tempDir().getAbsolutePath}/$topic-${topicPartition.partition}"
+    val topicLogDir = new File(topicPartitionPath)
     topicLogDir.mkdir()
 
-    val topic = topicPartition.split("-").head
-
-    // setup brokers in ZooKeeper as owners of partitions for this test
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
+    createTopic(topic, numPartitions = 1, replicationFactor = 1)
 
     var offsetChanged = false
     for (_ <- 1 to 14) {
-      val topicAndPartition = TopicAndPartition(topic, 0)
-      val offsetRequest =
-        OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
-      val consumerOffsets =
-        simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-
-      if(consumerOffsets.head == 1) {
+      val topicPartition = new TopicPartition(topic, 0)
+      val request = ListOffsetRequest.Builder.forReplica(0, 0)
+        .setOffsetData(Map(topicPartition ->
+          new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 1)).asJava).build()
+      val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
+      if (consumerOffsets.head == 1)
         offsetChanged = true
-      }
     }
     assertFalse(offsetChanged)
   }
 
   @Test
   def testGetOffsetsBeforeNow() {
-    val topicPartition = "kafka-" + random.nextInt(3)
-    val topic = topicPartition.split("-").head
-    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+    val random = new Random
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, random.nextInt(3))
 
-    // setup brokers in ZooKeeper as owners of partitions for this test
     adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.initialDefaultConfig)
+    val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig)
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -189,42 +168,42 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
 
-    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), now, 15)
+    val offsets = server.apis.fetchOffsets(logManager, topicPartition, now, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 
-    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
-    val topicAndPartition = TopicAndPartition(topic, part)
-    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 15)), replicaId = 0)
-    val consumerOffsets =
-      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+      "Leader should be elected")
+    val request = ListOffsetRequest.Builder.forReplica(0, 0)
+      .setOffsetData(Map(topicPartition ->
+        new ListOffsetRequest.PartitionData(now, 15)).asJava).build()
+    val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
   }
 
   @Test
   def testGetOffsetsBeforeEarliestTime() {
-    val topicPartition = "kafka-" + random.nextInt(3)
-    val topic = topicPartition.split("-").head
-    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+    val random = new Random
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, random.nextInt(3))
 
-    // setup brokers in ZooKeeper as owners of partitions for this test
     adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.initialDefaultConfig)
+    val log = logManager.getOrCreateLog(topicPartition, logManager.initialDefaultConfig)
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush()
 
-    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime, 10)
+    val offsets = server.apis.fetchOffsets(logManager, topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, 10)
 
     assertEquals(Seq(0L), offsets)
 
-    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
-    val topicAndPartition = TopicAndPartition(topic, part)
-    val offsetRequest =
-      OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
-    val consumerOffsets =
-      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+      "Leader should be elected")
+    val request = ListOffsetRequest.Builder.forReplica(0, 0)
+      .setOffsetData(Map(topicPartition ->
+        new ListOffsetRequest.PartitionData(ListOffsetRequest.EARLIEST_TIMESTAMP, 10)).asJava).build()
+    val consumerOffsets = sendListOffsetsRequest(request).responseData.get(topicPartition).offsets.asScala
     assertEquals(Seq(0L), consumerOffsets)
   }
 
@@ -264,19 +243,16 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100)
   }
 
-  private def createBrokerConfig(nodeId: Int): Properties = {
-    val props = new Properties
-    props.put("broker.id", nodeId.toString)
-    props.put("port", TestUtils.RandomPort.toString())
-    props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
-    props.put("log.flush.interval.messages", "1")
-    props.put("enable.zookeeper", "false")
-    props.put("num.partitions", "20")
-    props.put("log.retention.hours", "10")
-    props.put("log.retention.check.interval.ms", (5*1000*60).toString)
-    props.put("log.segment.bytes", logSize.toString)
-    props.put("zookeeper.connect", zkConnect.toString)
-    props
+  private def server: KafkaServer = servers.head
+
+  private def sendListOffsetsRequest(request: ListOffsetRequest, destination: Option[SocketServer] = None): ListOffsetResponse = {
+    val response = connectAndSend(request, ApiKeys.LIST_OFFSETS, destination = destination.getOrElse(anySocketServer))
+    ListOffsetResponse.parse(response, request.version)
+  }
+
+  private def sendFetchRequest(request: FetchRequest, destination: Option[SocketServer] = None): FetchResponse[MemoryRecords] = {
+    val response = connectAndSend(request, ApiKeys.FETCH, destination = destination.getOrElse(anySocketServer))
+    FetchResponse.parse(response, request.version)
   }
 
 }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.