You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/25 15:24:06 UTC
[kafka] branch 2.1 updated: KAFKA-7535;
KafkaConsumer doesn't report records-lag if isolation.level is
read_committed
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new ff38a7f KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
ff38a7f is described below
commit ff38a7f8f04df6a3d1c6cae4d3a3d3924cf21e2f
Author: lambdaliu <la...@tencent.com>
AuthorDate: Thu Oct 25 08:22:21 2018 -0700
KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
FetchResponse should return the partitionData's lastStabeleOffset
Author: lambdaliu <la...@tencent.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Dhruvil Shah <dh...@confluent.io>, Dong Lin <li...@gmail.com>
Closes #5835 from lambdaliu/KAFKA-7535
(cherry picked from commit 73da59191606fe9c03ef6e1b978549bc1d9ee81e)
Signed-off-by: Dong Lin <li...@gmail.com>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++--
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../kafka/api/PlaintextConsumerTest.scala | 27 ++++++++++++++++++++++
.../scala/unit/kafka/server/FetchRequestTest.scala | 18 ++++++++++++++-
4 files changed, 47 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6a99db3..e3dc921 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -573,7 +573,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
// client.
new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+ partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
} catch {
case e: UnsupportedCompressionTypeException =>
@@ -582,7 +582,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+ partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
unconvertedRecords)
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d5a3c68..e3feb71 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -95,7 +95,7 @@ case class LogReadResult(info: FetchDataInfo,
override def toString =
s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
- s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]"
+ s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 522ca49..a23513f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1587,6 +1587,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
@Test
+ def testPerPartitionLagMetricsWhenReadCommitted() {
+ val numMessages = 1000
+ // send some messages.
+ val producer = createProducer()
+ sendRecords(producer, numMessages, tp)
+ sendRecords(producer, numMessages, tp2)
+
+ consumerConfig.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
+ consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
+ val consumer = createConsumer()
+ consumer.assign(List(tp).asJava)
+ var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+ TestUtils.waitUntilTrue(() => {
+ records = consumer.poll(100)
+ !records.records(tp).isEmpty
+ }, "Consumer did not consume any message before timeout.")
+ // Verify the metric exist.
+ val tags = new util.HashMap[String, String]()
+ tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+ tags.put("topic", tp.topic())
+ tags.put("partition", String.valueOf(tp.partition()))
+ val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
+ assertNotNull(fetchLag)
+ }
+
+ @Test
def testPerPartitionLeadWithMaxPollRecords() {
val numMessages = 1000
val maxPollRecords = 10
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 72a2854..86f3314 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.junit.Assert._
import org.junit.Test
@@ -172,6 +172,22 @@ class FetchRequestTest extends BaseRequestTest {
}
@Test
+ def testFetchRequestV4WithReadCommitted(): Unit = {
+ initProducer()
+ val maxPartitionBytes = 200
+ val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
+ producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
+ Seq(topicPartition))).isolationLevel(IsolationLevel.READ_COMMITTED).build(4)
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+ val partitionData = fetchResponse.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, partitionData.error)
+ assertTrue(partitionData.lastStableOffset > 0)
+ assertTrue(records(partitionData).map(_.sizeInBytes).sum > 0)
+ }
+
+ @Test
def testFetchRequestToNonReplica(): Unit = {
val topic = "topic"
val partition = 0