You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/30 22:39:53 UTC

kafka git commit: KAFKA-5522; ListOffsets should bound timestamp search by LSO in read_committed

Repository: kafka
Updated Branches:
  refs/heads/trunk 9c2de4920 -> 9238aeaa2


KAFKA-5522; ListOffsets should bound timestamp search by LSO in read_committed

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3456 from hachikuji/KAFKA-5522


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9238aeaa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9238aeaa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9238aeaa

Branch: refs/heads/trunk
Commit: 9238aeaa2171f7860fd6ae7980608505a63c12ca
Parents: 9c2de49
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 30 15:38:03 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jun 30 15:38:03 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Replica.scala |   2 +-
 core/src/main/scala/kafka/log/IndexEntry.scala  |   6 +
 .../src/main/scala/kafka/server/KafkaApis.scala |  45 ++--
 .../kafka/api/PlaintextConsumerTest.scala       |  16 +-
 .../kafka/api/TransactionsTest.scala            |  41 +++-
 .../scala/unit/kafka/server/KafkaApisTest.scala | 235 +++++++++++++++----
 6 files changed, 250 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index e3b1f2d..8f08089 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -142,7 +142,7 @@ class Replica(val brokerId: Int,
     }
   }
 
-  def highWatermark = highWatermarkMetadata
+  def highWatermark: LogOffsetMetadata = highWatermarkMetadata
 
   /**
    * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."

http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/main/scala/kafka/log/IndexEntry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala b/core/src/main/scala/kafka/log/IndexEntry.scala
index 2f5a6a7..c8dd200 100644
--- a/core/src/main/scala/kafka/log/IndexEntry.scala
+++ b/core/src/main/scala/kafka/log/IndexEntry.scala
@@ -17,6 +17,8 @@
 
 package kafka.log
 
+import org.apache.kafka.common.requests.ListOffsetResponse
+
 sealed trait IndexEntry {
   // We always use Long for both key and value to avoid boxing.
   def indexKey: Long
@@ -44,3 +46,7 @@ case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
   override def indexKey = timestamp
   override def indexValue = offset
 }
+
+object TimestampOffset {
+  val Unknown = TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 68c34d7..4f90421 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -631,17 +631,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
     if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota
 
-  /**
-   * Handle an offset request
-   */
   def handleListOffsetRequest(request: RequestChannel.Request) {
     val version = request.header.apiVersion()
 
-    val mergedResponseMap =
-      if (version == 0)
-        handleListOffsetRequestV0(request)
-      else
-        handleListOffsetRequestV1AndAbove(request)
+    val mergedResponseMap = if (version == 0)
+      handleListOffsetRequestV0(request)
+    else
+      handleListOffsetRequestV1AndAbove(request)
 
     sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
   }
@@ -721,30 +717,31 @@ class KafkaApis(val requestChannel: RequestChannel,
                                                               ListOffsetResponse.UNKNOWN_OFFSET))
       } else {
         try {
-          val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
-
           // ensure leader exists
           val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
             replicaManager.getLeaderReplicaIfLocal(topicPartition)
           else
             replicaManager.getReplicaOrException(topicPartition)
 
-          val found = {
-            if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
-              val lastFetchableOffset = offsetRequest.isolationLevel match {
-                case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
-                case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
-              }
+          val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
+          val found = if (fromConsumer) {
+            val lastFetchableOffset = offsetRequest.isolationLevel match {
+              case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
+              case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
+            }
+
+            if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
               TimestampOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset)
-            } else {
+            else {
               def allowed(timestampOffset: TimestampOffset): Boolean =
-                !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset
+                timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP || timestampOffset.offset < lastFetchableOffset
 
-              fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match {
-                case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset
-                case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
-              }
+              fetchOffsetForTimestamp(topicPartition, timestamp)
+                .filter(allowed).getOrElse(TimestampOffset.Unknown)
             }
+          } else {
+            fetchOffsetForTimestamp(topicPartition, timestamp)
+              .getOrElse(TimestampOffset.Unknown)
           }
 
           (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset))
@@ -782,8 +779,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
-    logManager.getLog(topicPartition) match {
+  private def fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset] = {
+    replicaManager.getLog(topicPartition) match {
       case Some(log) =>
         log.fetchOffsetsByTimestamp(timestamp)
       case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index df49811..aad9b6a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -42,9 +42,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testHeaders() {
     val numRecords = 1
-    val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+    val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
     
-    record.headers().add(s"headerKey", s"headerValue".getBytes)
+    record.headers().add("headerKey", "headerValue".getBytes)
     
     this.producers.head.send(record)
     
@@ -59,22 +59,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     for (i <- 0 until numRecords) {
       val record = records(i)
-      val header = record.headers().lastHeader(s"headerKey")
-      assertEquals(s"headerValue", if (header == null) null else new String(header.value()))
+      val header = record.headers().lastHeader("headerKey")
+      assertEquals("headerValue", if (header == null) null else new String(header.value()))
     }
   }
   
   @Test
   def testHeadersExtendedSerializerDeserializer() {
     val numRecords = 1
-    val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+    val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
 
     val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
       
       var serializer = new ByteArraySerializer()
       
       override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
-        headers.add(s"content-type", s"application/octet-stream".getBytes)
+        headers.add("content-type", "application/octet-stream".getBytes)
         serializer.serialize(topic, data)
       }
 
@@ -94,8 +94,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       var deserializer = new ByteArrayDeserializer()
       
       override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
-        var header = headers.lastHeader(s"content-type")
-        assertEquals(s"application/octet-stream", if (header == null) null else new String(header.value()))
+        val header = headers.lastHeader("content-type")
+        assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
         deserializer.deserialize(topic, data)
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index af241b7..0e57e53 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import java.lang.{Long => JLong}
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
@@ -124,27 +125,39 @@ class TransactionsTest extends KafkaServerTestHarness {
 
     producer1.beginTransaction()
     producer2.beginTransaction()
-    producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "1".getBytes))
-    producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "1".getBytes))
+
+    val latestVisibleTimestamp = System.currentTimeMillis()
+    producer2.send(new ProducerRecord(topic1, 0, latestVisibleTimestamp, "x".getBytes, "1".getBytes))
+    producer2.send(new ProducerRecord(topic2, 0, latestVisibleTimestamp, "x".getBytes, "1".getBytes))
     producer2.flush()
 
-    producer1.send(new ProducerRecord(topic1, 0, "a".getBytes, "1".getBytes))
-    producer1.send(new ProducerRecord(topic1, 0, "b".getBytes, "2".getBytes))
-    producer1.send(new ProducerRecord(topic2, 0, "c".getBytes, "3".getBytes))
-    producer1.send(new ProducerRecord(topic2, 0, "d".getBytes, "4".getBytes))
+    val latestWrittenTimestamp = latestVisibleTimestamp + 1
+    producer1.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "a".getBytes, "1".getBytes))
+    producer1.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "b".getBytes, "2".getBytes))
+    producer1.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "c".getBytes, "3".getBytes))
+    producer1.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "d".getBytes, "4".getBytes))
     producer1.flush()
 
-    producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "2".getBytes))
-    producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "2".getBytes))
+    producer2.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "x".getBytes, "2".getBytes))
+    producer2.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "x".getBytes, "2".getBytes))
     producer2.commitTransaction()
 
     // ensure the records are visible to the read uncommitted consumer
-    readUncommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+    val tp1 = new TopicPartition(topic1, 0)
+    val tp2 = new TopicPartition(topic2, 0)
+    readUncommittedConsumer.assign(Set(tp1, tp2).asJava)
     consumeRecords(readUncommittedConsumer, 8)
+    val readUncommittedOffsetsForTimes = readUncommittedConsumer.offsetsForTimes(Map(
+      tp1 -> (latestWrittenTimestamp: JLong),
+      tp2 -> (latestWrittenTimestamp: JLong)
+    ).asJava)
+    assertEquals(2, readUncommittedOffsetsForTimes.size)
+    assertEquals(latestWrittenTimestamp, readUncommittedOffsetsForTimes.get(tp1).timestamp)
+    assertEquals(latestWrittenTimestamp, readUncommittedOffsetsForTimes.get(tp2).timestamp)
     readUncommittedConsumer.unsubscribe()
 
     // we should only see the first two records which come before the undecided second transaction
-    readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+    readCommittedConsumer.assign(Set(tp1, tp2).asJava)
     val records = consumeRecords(readCommittedConsumer, 2)
     records.foreach { record =>
       assertEquals("x", new String(record.key))
@@ -157,6 +170,14 @@ class TransactionsTest extends KafkaServerTestHarness {
     readCommittedConsumer.assignment.asScala.foreach { tp =>
       assertEquals(1L, readCommittedConsumer.position(tp))
     }
+
+    // undecided timestamps should not be searchable either
+    val readCommittedOffsetsForTimes = readCommittedConsumer.offsetsForTimes(Map(
+      tp1 -> (latestWrittenTimestamp: JLong),
+      tp2 -> (latestWrittenTimestamp: JLong)
+    ).asJava)
+    assertNull(readCommittedOffsetsForTimes.get(tp1))
+    assertNull(readCommittedOffsetsForTimes.get(tp2))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fb17543..b8d8cc6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,14 +17,16 @@
 
 package unit.kafka.server
 
-
+import java.lang.{Long => JLong}
 import java.net.InetAddress
 import java.util
 
-import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_0_11_0_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0}
+import kafka.cluster.Replica
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.{Log, TimestampOffset}
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Authorizer
@@ -43,12 +45,12 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Utils
 import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.{Assert, Before, Test}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
-
 class KafkaApisTest {
 
   private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
@@ -70,9 +72,7 @@ class KafkaApisTest {
   private val clusterId = "clusterId"
   private val time = new MockTime
 
-
-
-  def createKafkaApis(interBrokerProtocolVersion: ApiVersion): KafkaApis = {
+  def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): KafkaApis = {
     val properties = TestUtils.createBrokerConfig(brokerId, "zk")
     properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
     properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
@@ -123,7 +123,7 @@ class KafkaApisTest {
   @Test
   def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = {
     val topicPartition = new TopicPartition("t", 0)
-    val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
+    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
     val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava
     val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
 
@@ -132,28 +132,18 @@ class KafkaApisTest {
     EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
     EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
 
-    createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
-
-    val send = capturedResponse.getValue.responseSend.get
-    val channel = new ByteBufferChannel(send.size())
-    send.writeTo(channel)
-    channel.close()
-
-    // read the size
-    channel.buffer.getInt()
-
-    val responseHeader = ResponseHeader.parse(channel.buffer)
-    val struct = ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer)
+    createKafkaApis().handleWriteTxnMarkersRequest(request)
 
-    val markersResponse = new WriteTxnMarkersResponse(struct)
-    Assert.assertEquals(expectedErrors, markersResponse.errors(1))
+    val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest, capturedResponse)
+      .asInstanceOf[WriteTxnMarkersResponse]
+    assertEquals(expectedErrors, markersResponse.errors(1))
   }
 
   @Test
   def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
     val tp1 = new TopicPartition("t", 0)
     val tp2 = new TopicPartition("t1", 0)
-    val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(tp1, tp2))
+    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(tp1, tp2))
     val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
 
     val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
@@ -164,7 +154,6 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.getMagic(tp2))
       .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
-
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
       EasyMock.eq(true),
@@ -180,29 +169,18 @@ class KafkaApisTest {
     EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
     EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
 
+    createKafkaApis().handleWriteTxnMarkersRequest(request)
 
-    createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
-
-    val send = capturedResponse.getValue.responseSend.get
-    val channel = new ByteBufferChannel(send.size())
-    send.writeTo(channel)
-    channel.close()
-
-    // read the size
-    channel.buffer.getInt()
-
-    val responseHeader = ResponseHeader.parse(channel.buffer)
-    val struct = ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer)
-
-    val markersResponse = new WriteTxnMarkersResponse(struct)
-    Assert.assertEquals(expectedErrors, markersResponse.errors(1))
+    val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest, capturedResponse)
+      .asInstanceOf[WriteTxnMarkersResponse]
+    assertEquals(expectedErrors, markersResponse.errors(1))
     EasyMock.verify(replicaManager)
   }
 
   @Test
   def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
     val topicPartition = new TopicPartition("t", 0)
-    val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
+    val request = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))._2
     EasyMock.expect(replicaManager.getMagic(topicPartition))
       .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
 
@@ -216,24 +194,177 @@ class KafkaApisTest {
 
     EasyMock.replay(replicaManager)
 
-    createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
+    createKafkaApis().handleWriteTxnMarkersRequest(request)
     EasyMock.verify(replicaManager)
   }
 
+  @Test
+  def testReadUncommittedConsumerListOffsetLimitedAtHighWatermark(): Unit = {
+    testConsumerListOffsetLimit(IsolationLevel.READ_UNCOMMITTED)
+  }
+
+  @Test
+  def testReadCommittedConsumerListOffsetLimitedAtLastStableOffset(): Unit = {
+    testConsumerListOffsetLimit(IsolationLevel.READ_COMMITTED)
+  }
+
+  private def testConsumerListOffsetLimit(isolationLevel: IsolationLevel): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val timestamp: JLong = time.milliseconds()
+    val limitOffset = 15L
+
+    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+    val replica = EasyMock.mock(classOf[Replica])
+    val log = EasyMock.mock(classOf[Log])
+    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
+    if (isolationLevel == IsolationLevel.READ_UNCOMMITTED)
+      EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+    else
+      EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+    EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
+    EasyMock.expect(log.fetchOffsetsByTimestamp(timestamp)).andReturn(Some(TimestampOffset(timestamp = timestamp, offset = limitOffset)))
+    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
+
+    val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
+      .setTargetTimes(Map(tp -> timestamp).asJava)
+    val (listOffsetRequest, request) = buildRequest(builder)
+    createKafkaApis().handleListOffsetRequest(request)
+
+    val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
+    assertTrue(response.responseData.containsKey(tp))
+
+    val partitionData = response.responseData.get(tp)
+    assertEquals(Errors.NONE, partitionData.error)
+    assertEquals(ListOffsetResponse.UNKNOWN_OFFSET, partitionData.offset)
+    assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
+  }
+
+  @Test
+  def testReadUncommittedConsumerListOffsetEarliestOffsetEqualsHighWatermark(): Unit = {
+    testConsumerListOffsetEarliestOffsetEqualsLimit(IsolationLevel.READ_UNCOMMITTED)
+  }
+
+  @Test
+  def testReadCommittedConsumerListOffsetEarliestOffsetEqualsLastStableOffset(): Unit = {
+    testConsumerListOffsetEarliestOffsetEqualsLimit(IsolationLevel.READ_COMMITTED)
+  }
+
+  private def testConsumerListOffsetEarliestOffsetEqualsLimit(isolationLevel: IsolationLevel): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val limitOffset = 15L
+
+    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+    val replica = EasyMock.mock(classOf[Replica])
+    val log = EasyMock.mock(classOf[Log])
+    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
+    if (isolationLevel == IsolationLevel.READ_UNCOMMITTED)
+      EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+    else
+      EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+    EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
+    EasyMock.expect(log.fetchOffsetsByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP))
+      .andReturn(Some(TimestampOffset(timestamp = ListOffsetResponse.UNKNOWN_TIMESTAMP, offset = limitOffset)))
+    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
+
+    val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
+      .setTargetTimes(Map(tp -> (ListOffsetRequest.EARLIEST_TIMESTAMP: JLong)).asJava)
+    val (listOffsetRequest, request) = buildRequest(builder)
+    createKafkaApis().handleListOffsetRequest(request)
+
+    val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
+    assertTrue(response.responseData.containsKey(tp))
+
+    val partitionData = response.responseData.get(tp)
+    assertEquals(Errors.NONE, partitionData.error)
+    assertEquals(limitOffset, partitionData.offset)
+    assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
+  }
+
+  @Test
+  def testReadUncommittedConsumerListOffsetLatest(): Unit = {
+    testConsumerListOffsetLatest(IsolationLevel.READ_UNCOMMITTED)
+  }
+
+  @Test
+  def testReadCommittedConsumerListOffsetLatest(): Unit = {
+    testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED)
+  }
+
+  private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val latestOffset = 15L
+
+    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+    val replica = EasyMock.mock(classOf[Replica])
+    val log = EasyMock.mock(classOf[Log])
+    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
+    if (isolationLevel == IsolationLevel.READ_UNCOMMITTED)
+      EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = latestOffset))
+    else
+      EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = latestOffset))
+    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
+
+    val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
+      .setTargetTimes(Map(tp -> (ListOffsetRequest.LATEST_TIMESTAMP: JLong)).asJava)
+    val (listOffsetRequest, request) = buildRequest(builder)
+    createKafkaApis().handleListOffsetRequest(request)
+
+    val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
+    assertTrue(response.responseData.containsKey(tp))
+
+    val partitionData = response.responseData.get(tp)
+    assertEquals(Errors.NONE, partitionData.error)
+    assertEquals(latestOffset, partitionData.offset)
+    assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
+  }
+
   private def createWriteTxnMarkersRequest(partitions: util.List[TopicPartition]) = {
-    val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(Utils.mkList(
-    new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions))).build()
-    val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS.id, writeTxnMarkersRequest.version(), "", 0)
-    val byteBuffer = writeTxnMarkersRequest.serialize(header)
+    val requestBuilder = new WriteTxnMarkersRequest.Builder(Utils.mkList(
+      new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions)))
+    buildRequest(requestBuilder)
+  }
 
-    val request = RequestChannel.Request(1, "1",
-    Session(KafkaPrincipal.ANONYMOUS,
-    InetAddress.getLocalHost),
-    byteBuffer, 0,
-    new ListenerName(""),
-    SecurityProtocol.PLAINTEXT)
-    (writeTxnMarkersRequest, request)
+  private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T]): (T, RequestChannel.Request) = {
+    val request = builder.build()
+    val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
+    val buffer = request.serialize(header)
+    val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
+    (request, RequestChannel.Request(1, "1", session, buffer, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
   }
 
+  private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {
+    val send = capturedResponse.getValue.responseSend.get
+    val channel = new ByteBufferChannel(send.size)
+    send.writeTo(channel)
+    channel.close()
+    channel.buffer.getInt() // read the size
+    ResponseHeader.parse(channel.buffer)
+    val struct = api.responseSchema(request.version).read(channel.buffer)
+    AbstractResponse.getResponse(api, struct)
+  }
+
+  private def expectThrottleCallbackAndInvoke(capturedThrottleCallback: Capture[Int => Unit]): Unit = {
+    EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle(
+      EasyMock.anyString(),
+      EasyMock.anyString(),
+      EasyMock.anyLong(),
+      EasyMock.capture(capturedThrottleCallback),
+      EasyMock.anyObject[(Long => Unit) => Unit]()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          val callback = capturedThrottleCallback.getValue
+          callback(0)
+        }
+      })
+  }
 
 }