You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/25 15:46:01 UTC

[kafka] branch 2.0 updated: KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new d43e216  KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
d43e216 is described below

commit d43e2167b657ac0e2422a04140f7eb6acbe76d70
Author: lambdaliu <la...@tencent.com>
AuthorDate: Thu Oct 25 08:22:21 2018 -0700

    KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
    
    FetchResponse should return the partitionData's lastStabeleOffset
    
    Author: lambdaliu <la...@tencent.com>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Dhruvil Shah <dh...@confluent.io>, Dong Lin <li...@gmail.com>
    
    Closes #5835 from lambdaliu/KAFKA-7535
---
 core/src/main/scala/kafka/server/KafkaApis.scala       |  2 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala     | 18 +++++++++++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c88be9..d8d94e5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -575,7 +575,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
           }.getOrElse(unconvertedRecords)
         new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
-          FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+          partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
           convertedRecords)
       }
     }
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 67f33eb..b995025 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
 import org.junit.Assert._
 import org.junit.Test
@@ -170,6 +170,22 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   @Test
+  def testFetchRequestV4WithReadCommitted(): Unit = {
+    initProducer()
+    val maxPartitionBytes = 200
+    val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
+    producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+      "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
+      Seq(topicPartition))).isolationLevel(IsolationLevel.READ_COMMITTED).build(4)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+    val partitionData = fetchResponse.responseData.get(topicPartition)
+    assertEquals(Errors.NONE, partitionData.error)
+    assertTrue(partitionData.lastStableOffset > 0)
+    assertTrue(records(partitionData).map(_.sizeInBytes).sum > 0)
+  }
+
+  @Test
   def testFetchRequestToNonReplica(): Unit = {
     val topic = "topic"
     val partition = 0