You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/30 22:39:53 UTC
kafka git commit: KAFKA-5522;
ListOffsets should bound timestamp search by LSO in read_committed
Repository: kafka
Updated Branches:
refs/heads/trunk 9c2de4920 -> 9238aeaa2
KAFKA-5522; ListOffsets should bound timestamp search by LSO in read_committed
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3456 from hachikuji/KAFKA-5522
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9238aeaa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9238aeaa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9238aeaa
Branch: refs/heads/trunk
Commit: 9238aeaa2171f7860fd6ae7980608505a63c12ca
Parents: 9c2de49
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 30 15:38:03 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jun 30 15:38:03 2017 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/cluster/Replica.scala | 2 +-
core/src/main/scala/kafka/log/IndexEntry.scala | 6 +
.../src/main/scala/kafka/server/KafkaApis.scala | 45 ++--
.../kafka/api/PlaintextConsumerTest.scala | 16 +-
.../kafka/api/TransactionsTest.scala | 41 +++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 235 +++++++++++++++----
6 files changed, 250 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index e3b1f2d..8f08089 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -142,7 +142,7 @@ class Replica(val brokerId: Int,
}
}
- def highWatermark = highWatermarkMetadata
+ def highWatermark: LogOffsetMetadata = highWatermarkMetadata
/**
* The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/main/scala/kafka/log/IndexEntry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala b/core/src/main/scala/kafka/log/IndexEntry.scala
index 2f5a6a7..c8dd200 100644
--- a/core/src/main/scala/kafka/log/IndexEntry.scala
+++ b/core/src/main/scala/kafka/log/IndexEntry.scala
@@ -17,6 +17,8 @@
package kafka.log
+import org.apache.kafka.common.requests.ListOffsetResponse
+
sealed trait IndexEntry {
// We always use Long for both key and value to avoid boxing.
def indexKey: Long
@@ -44,3 +46,7 @@ case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
override def indexKey = timestamp
override def indexValue = offset
}
+
+object TimestampOffset {
+ val Unknown = TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 68c34d7..4f90421 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -631,17 +631,13 @@ class KafkaApis(val requestChannel: RequestChannel,
def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota
- /**
- * Handle an offset request
- */
def handleListOffsetRequest(request: RequestChannel.Request) {
val version = request.header.apiVersion()
- val mergedResponseMap =
- if (version == 0)
- handleListOffsetRequestV0(request)
- else
- handleListOffsetRequestV1AndAbove(request)
+ val mergedResponseMap = if (version == 0)
+ handleListOffsetRequestV0(request)
+ else
+ handleListOffsetRequestV1AndAbove(request)
sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
}
@@ -721,30 +717,31 @@ class KafkaApis(val requestChannel: RequestChannel,
ListOffsetResponse.UNKNOWN_OFFSET))
} else {
try {
- val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
-
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
replicaManager.getLeaderReplicaIfLocal(topicPartition)
else
replicaManager.getReplicaOrException(topicPartition)
- val found = {
- if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
- val lastFetchableOffset = offsetRequest.isolationLevel match {
- case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
- case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
- }
+ val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
+ val found = if (fromConsumer) {
+ val lastFetchableOffset = offsetRequest.isolationLevel match {
+ case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
+ case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
+ }
+
+ if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
TimestampOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset)
- } else {
+ else {
def allowed(timestampOffset: TimestampOffset): Boolean =
- !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset
+ timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP || timestampOffset.offset < lastFetchableOffset
- fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match {
- case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset
- case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
- }
+ fetchOffsetForTimestamp(topicPartition, timestamp)
+ .filter(allowed).getOrElse(TimestampOffset.Unknown)
}
+ } else {
+ fetchOffsetForTimestamp(topicPartition, timestamp)
+ .getOrElse(TimestampOffset.Unknown)
}
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset))
@@ -782,8 +779,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
- logManager.getLog(topicPartition) match {
+ private def fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset] = {
+ replicaManager.getLog(topicPartition) match {
case Some(log) =>
log.fetchOffsetsByTimestamp(timestamp)
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index df49811..aad9b6a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -42,9 +42,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testHeaders() {
val numRecords = 1
- val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+ val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
- record.headers().add(s"headerKey", s"headerValue".getBytes)
+ record.headers().add("headerKey", "headerValue".getBytes)
this.producers.head.send(record)
@@ -59,22 +59,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
for (i <- 0 until numRecords) {
val record = records(i)
- val header = record.headers().lastHeader(s"headerKey")
- assertEquals(s"headerValue", if (header == null) null else new String(header.value()))
+ val header = record.headers().lastHeader("headerKey")
+ assertEquals("headerValue", if (header == null) null else new String(header.value()))
}
}
@Test
def testHeadersExtendedSerializerDeserializer() {
val numRecords = 1
- val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+ val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
var serializer = new ByteArraySerializer()
override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
- headers.add(s"content-type", s"application/octet-stream".getBytes)
+ headers.add("content-type", "application/octet-stream".getBytes)
serializer.serialize(topic, data)
}
@@ -94,8 +94,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
var deserializer = new ByteArrayDeserializer()
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
- var header = headers.lastHeader(s"content-type")
- assertEquals(s"application/octet-stream", if (header == null) null else new String(header.value()))
+ val header = headers.lastHeader("content-type")
+ assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
deserializer.deserialize(topic, data)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index af241b7..0e57e53 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -17,6 +17,7 @@
package kafka.api
+import java.lang.{Long => JLong}
import java.util.Properties
import java.util.concurrent.TimeUnit
@@ -124,27 +125,39 @@ class TransactionsTest extends KafkaServerTestHarness {
producer1.beginTransaction()
producer2.beginTransaction()
- producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "1".getBytes))
- producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "1".getBytes))
+
+ val latestVisibleTimestamp = System.currentTimeMillis()
+ producer2.send(new ProducerRecord(topic1, 0, latestVisibleTimestamp, "x".getBytes, "1".getBytes))
+ producer2.send(new ProducerRecord(topic2, 0, latestVisibleTimestamp, "x".getBytes, "1".getBytes))
producer2.flush()
- producer1.send(new ProducerRecord(topic1, 0, "a".getBytes, "1".getBytes))
- producer1.send(new ProducerRecord(topic1, 0, "b".getBytes, "2".getBytes))
- producer1.send(new ProducerRecord(topic2, 0, "c".getBytes, "3".getBytes))
- producer1.send(new ProducerRecord(topic2, 0, "d".getBytes, "4".getBytes))
+ val latestWrittenTimestamp = latestVisibleTimestamp + 1
+ producer1.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "a".getBytes, "1".getBytes))
+ producer1.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "b".getBytes, "2".getBytes))
+ producer1.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "c".getBytes, "3".getBytes))
+ producer1.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "d".getBytes, "4".getBytes))
producer1.flush()
- producer2.send(new ProducerRecord(topic1, 0, "x".getBytes, "2".getBytes))
- producer2.send(new ProducerRecord(topic2, 0, "x".getBytes, "2".getBytes))
+ producer2.send(new ProducerRecord(topic1, 0, latestWrittenTimestamp, "x".getBytes, "2".getBytes))
+ producer2.send(new ProducerRecord(topic2, 0, latestWrittenTimestamp, "x".getBytes, "2".getBytes))
producer2.commitTransaction()
// ensure the records are visible to the read uncommitted consumer
- readUncommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+ val tp1 = new TopicPartition(topic1, 0)
+ val tp2 = new TopicPartition(topic2, 0)
+ readUncommittedConsumer.assign(Set(tp1, tp2).asJava)
consumeRecords(readUncommittedConsumer, 8)
+ val readUncommittedOffsetsForTimes = readUncommittedConsumer.offsetsForTimes(Map(
+ tp1 -> (latestWrittenTimestamp: JLong),
+ tp2 -> (latestWrittenTimestamp: JLong)
+ ).asJava)
+ assertEquals(2, readUncommittedOffsetsForTimes.size)
+ assertEquals(latestWrittenTimestamp, readUncommittedOffsetsForTimes.get(tp1).timestamp)
+ assertEquals(latestWrittenTimestamp, readUncommittedOffsetsForTimes.get(tp2).timestamp)
readUncommittedConsumer.unsubscribe()
// we should only see the first two records which come before the undecided second transaction
- readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0), new TopicPartition(topic2, 0)).asJava)
+ readCommittedConsumer.assign(Set(tp1, tp2).asJava)
val records = consumeRecords(readCommittedConsumer, 2)
records.foreach { record =>
assertEquals("x", new String(record.key))
@@ -157,6 +170,14 @@ class TransactionsTest extends KafkaServerTestHarness {
readCommittedConsumer.assignment.asScala.foreach { tp =>
assertEquals(1L, readCommittedConsumer.position(tp))
}
+
+ // undecided timestamps should not be searchable either
+ val readCommittedOffsetsForTimes = readCommittedConsumer.offsetsForTimes(Map(
+ tp1 -> (latestWrittenTimestamp: JLong),
+ tp2 -> (latestWrittenTimestamp: JLong)
+ ).asJava)
+ assertNull(readCommittedOffsetsForTimes.get(tp1))
+ assertNull(readCommittedOffsetsForTimes.get(tp2))
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/9238aeaa/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fb17543..b8d8cc6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,14 +17,16 @@
package unit.kafka.server
-
+import java.lang.{Long => JLong}
import java.net.InetAddress
import java.util
-import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_0_11_0_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0}
+import kafka.cluster.Replica
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.{Log, TimestampOffset}
import kafka.network.RequestChannel
import kafka.network.RequestChannel.Session
import kafka.security.auth.Authorizer
@@ -43,12 +45,12 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Utils
import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.{Assert, Before, Test}
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
import scala.collection.JavaConverters._
import scala.collection.Map
-
class KafkaApisTest {
private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
@@ -70,9 +72,7 @@ class KafkaApisTest {
private val clusterId = "clusterId"
private val time = new MockTime
-
-
- def createKafkaApis(interBrokerProtocolVersion: ApiVersion): KafkaApis = {
+ def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): KafkaApis = {
val properties = TestUtils.createBrokerConfig(brokerId, "zk")
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
@@ -123,7 +123,7 @@ class KafkaApisTest {
@Test
def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = {
val topicPartition = new TopicPartition("t", 0)
- val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
+ val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
@@ -132,28 +132,18 @@ class KafkaApisTest {
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
- createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
-
- val send = capturedResponse.getValue.responseSend.get
- val channel = new ByteBufferChannel(send.size())
- send.writeTo(channel)
- channel.close()
-
- // read the size
- channel.buffer.getInt()
-
- val responseHeader = ResponseHeader.parse(channel.buffer)
- val struct = ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer)
+ createKafkaApis().handleWriteTxnMarkersRequest(request)
- val markersResponse = new WriteTxnMarkersResponse(struct)
- Assert.assertEquals(expectedErrors, markersResponse.errors(1))
+ val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest, capturedResponse)
+ .asInstanceOf[WriteTxnMarkersResponse]
+ assertEquals(expectedErrors, markersResponse.errors(1))
}
@Test
def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
val tp2 = new TopicPartition("t1", 0)
- val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(tp1, tp2))
+ val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(tp1, tp2))
val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
@@ -164,7 +154,6 @@ class KafkaApisTest {
EasyMock.expect(replicaManager.getMagic(tp2))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
-
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
EasyMock.anyShort(),
EasyMock.eq(true),
@@ -180,29 +169,18 @@ class KafkaApisTest {
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+ createKafkaApis().handleWriteTxnMarkersRequest(request)
- createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
-
- val send = capturedResponse.getValue.responseSend.get
- val channel = new ByteBufferChannel(send.size())
- send.writeTo(channel)
- channel.close()
-
- // read the size
- channel.buffer.getInt()
-
- val responseHeader = ResponseHeader.parse(channel.buffer)
- val struct = ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer)
-
- val markersResponse = new WriteTxnMarkersResponse(struct)
- Assert.assertEquals(expectedErrors, markersResponse.errors(1))
+ val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest, capturedResponse)
+ .asInstanceOf[WriteTxnMarkersResponse]
+ assertEquals(expectedErrors, markersResponse.errors(1))
EasyMock.verify(replicaManager)
}
@Test
def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
val topicPartition = new TopicPartition("t", 0)
- val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
+ val request = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))._2
EasyMock.expect(replicaManager.getMagic(topicPartition))
.andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
@@ -216,24 +194,177 @@ class KafkaApisTest {
EasyMock.replay(replicaManager)
- createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
+ createKafkaApis().handleWriteTxnMarkersRequest(request)
EasyMock.verify(replicaManager)
}
+ @Test
+ def testReadUncommittedConsumerListOffsetLimitedAtHighWatermark(): Unit = {
+ testConsumerListOffsetLimit(IsolationLevel.READ_UNCOMMITTED)
+ }
+
+ @Test
+ def testReadCommittedConsumerListOffsetLimitedAtLastStableOffset(): Unit = {
+ testConsumerListOffsetLimit(IsolationLevel.READ_COMMITTED)
+ }
+
+ private def testConsumerListOffsetLimit(isolationLevel: IsolationLevel): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val timestamp: JLong = time.milliseconds()
+ val limitOffset = 15L
+
+ val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+ val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+ val replica = EasyMock.mock(classOf[Replica])
+ val log = EasyMock.mock(classOf[Log])
+ EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
+ if (isolationLevel == IsolationLevel.READ_UNCOMMITTED)
+ EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+ else
+ EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+ EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
+ EasyMock.expect(log.fetchOffsetsByTimestamp(timestamp)).andReturn(Some(TimestampOffset(timestamp = timestamp, offset = limitOffset)))
+ expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+ EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
+
+ val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
+ .setTargetTimes(Map(tp -> timestamp).asJava)
+ val (listOffsetRequest, request) = buildRequest(builder)
+ createKafkaApis().handleListOffsetRequest(request)
+
+ val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
+ assertTrue(response.responseData.containsKey(tp))
+
+ val partitionData = response.responseData.get(tp)
+ assertEquals(Errors.NONE, partitionData.error)
+ assertEquals(ListOffsetResponse.UNKNOWN_OFFSET, partitionData.offset)
+ assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
+ }
+
+ @Test
+ def testReadUncommittedConsumerListOffsetEarliestOffsetEqualsHighWatermark(): Unit = {
+ testConsumerListOffsetEarliestOffsetEqualsLimit(IsolationLevel.READ_UNCOMMITTED)
+ }
+
+ @Test
+ def testReadCommittedConsumerListOffsetEarliestOffsetEqualsLastStableOffset(): Unit = {
+ testConsumerListOffsetEarliestOffsetEqualsLimit(IsolationLevel.READ_COMMITTED)
+ }
+
+ private def testConsumerListOffsetEarliestOffsetEqualsLimit(isolationLevel: IsolationLevel): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val limitOffset = 15L
+
+ val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+ val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+ val replica = EasyMock.mock(classOf[Replica])
+ val log = EasyMock.mock(classOf[Log])
+ EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
+ if (isolationLevel == IsolationLevel.READ_UNCOMMITTED)
+ EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+ else
+ EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
+ EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
+ EasyMock.expect(log.fetchOffsetsByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP))
+ .andReturn(Some(TimestampOffset(timestamp = ListOffsetResponse.UNKNOWN_TIMESTAMP, offset = limitOffset)))
+ expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+ EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
+
+ val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
+ .setTargetTimes(Map(tp -> (ListOffsetRequest.EARLIEST_TIMESTAMP: JLong)).asJava)
+ val (listOffsetRequest, request) = buildRequest(builder)
+ createKafkaApis().handleListOffsetRequest(request)
+
+ val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
+ assertTrue(response.responseData.containsKey(tp))
+
+ val partitionData = response.responseData.get(tp)
+ assertEquals(Errors.NONE, partitionData.error)
+ assertEquals(limitOffset, partitionData.offset)
+ assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
+ }
+
+ @Test
+ def testReadUncommittedConsumerListOffsetLatest(): Unit = {
+ testConsumerListOffsetLatest(IsolationLevel.READ_UNCOMMITTED)
+ }
+
+ @Test
+ def testReadCommittedConsumerListOffsetLatest(): Unit = {
+ testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED)
+ }
+
+ private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val latestOffset = 15L
+
+ val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+ val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+ val replica = EasyMock.mock(classOf[Replica])
+ val log = EasyMock.mock(classOf[Log])
+ EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
+ if (isolationLevel == IsolationLevel.READ_UNCOMMITTED)
+ EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = latestOffset))
+ else
+ EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = latestOffset))
+ expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+ EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
+
+ val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
+ .setTargetTimes(Map(tp -> (ListOffsetRequest.LATEST_TIMESTAMP: JLong)).asJava)
+ val (listOffsetRequest, request) = buildRequest(builder)
+ createKafkaApis().handleListOffsetRequest(request)
+
+ val response = readResponse(ApiKeys.LIST_OFFSETS, listOffsetRequest, capturedResponse).asInstanceOf[ListOffsetResponse]
+ assertTrue(response.responseData.containsKey(tp))
+
+ val partitionData = response.responseData.get(tp)
+ assertEquals(Errors.NONE, partitionData.error)
+ assertEquals(latestOffset, partitionData.offset)
+ assertEquals(ListOffsetResponse.UNKNOWN_TIMESTAMP, partitionData.timestamp)
+ }
+
private def createWriteTxnMarkersRequest(partitions: util.List[TopicPartition]) = {
- val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(Utils.mkList(
- new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions))).build()
- val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS.id, writeTxnMarkersRequest.version(), "", 0)
- val byteBuffer = writeTxnMarkersRequest.serialize(header)
+ val requestBuilder = new WriteTxnMarkersRequest.Builder(Utils.mkList(
+ new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, partitions)))
+ buildRequest(requestBuilder)
+ }
- val request = RequestChannel.Request(1, "1",
- Session(KafkaPrincipal.ANONYMOUS,
- InetAddress.getLocalHost),
- byteBuffer, 0,
- new ListenerName(""),
- SecurityProtocol.PLAINTEXT)
- (writeTxnMarkersRequest, request)
+ private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T]): (T, RequestChannel.Request) = {
+ val request = builder.build()
+ val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
+ val buffer = request.serialize(header)
+ val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
+ (request, RequestChannel.Request(1, "1", session, buffer, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
}
+ private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {
+ val send = capturedResponse.getValue.responseSend.get
+ val channel = new ByteBufferChannel(send.size)
+ send.writeTo(channel)
+ channel.close()
+ channel.buffer.getInt() // read the size
+ ResponseHeader.parse(channel.buffer)
+ val struct = api.responseSchema(request.version).read(channel.buffer)
+ AbstractResponse.getResponse(api, struct)
+ }
+
+ private def expectThrottleCallbackAndInvoke(capturedThrottleCallback: Capture[Int => Unit]): Unit = {
+ EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle(
+ EasyMock.anyString(),
+ EasyMock.anyString(),
+ EasyMock.anyLong(),
+ EasyMock.capture(capturedThrottleCallback),
+ EasyMock.anyObject[(Long => Unit) => Unit]()))
+ .andAnswer(new IAnswer[Unit] {
+ override def answer(): Unit = {
+ val callback = capturedThrottleCallback.getValue
+ callback(0)
+ }
+ })
+ }
}