You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/25 15:46:01 UTC
[kafka] branch 2.0 updated: KAFKA-7535;
KafkaConsumer doesn't report records-lag if isolation.level is
read_committed
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new d43e216 KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
d43e216 is described below
commit d43e2167b657ac0e2422a04140f7eb6acbe76d70
Author: lambdaliu <la...@tencent.com>
AuthorDate: Thu Oct 25 08:22:21 2018 -0700
KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
FetchResponse should return the partitionData's lastStabeleOffset
Author: lambdaliu <la...@tencent.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Dhruvil Shah <dh...@confluent.io>, Dong Lin <li...@gmail.com>
Closes #5835 from lambdaliu/KAFKA-7535
---
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/unit/kafka/server/FetchRequestTest.scala | 18 +++++++++++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c88be9..d8d94e5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -575,7 +575,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
}.getOrElse(unconvertedRecords)
new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+ partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
convertedRecords)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 67f33eb..b995025 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.junit.Assert._
import org.junit.Test
@@ -170,6 +170,22 @@ class FetchRequestTest extends BaseRequestTest {
}
@Test
+ def testFetchRequestV4WithReadCommitted(): Unit = {
+ initProducer()
+ val maxPartitionBytes = 200
+ val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
+ producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
+ Seq(topicPartition))).isolationLevel(IsolationLevel.READ_COMMITTED).build(4)
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+ val partitionData = fetchResponse.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, partitionData.error)
+ assertTrue(partitionData.lastStableOffset > 0)
+ assertTrue(records(partitionData).map(_.sizeInBytes).sum > 0)
+ }
+
+ @Test
def testFetchRequestToNonReplica(): Unit = {
val topic = "topic"
val partition = 0