You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/25 15:24:06 UTC

[kafka] branch 2.1 updated: KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new ff38a7f  KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
ff38a7f is described below

commit ff38a7f8f04df6a3d1c6cae4d3a3d3924cf21e2f
Author: lambdaliu <la...@tencent.com>
AuthorDate: Thu Oct 25 08:22:21 2018 -0700

    KAFKA-7535; KafkaConsumer doesn't report records-lag if isolation.level is read_committed
    
    FetchResponse should return the partitionData's lastStabeleOffset
    
    Author: lambdaliu <la...@tencent.com>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Dhruvil Shah <dh...@confluent.io>, Dong Lin <li...@gmail.com>
    
    Closes #5835 from lambdaliu/KAFKA-7535
    
    (cherry picked from commit 73da59191606fe9c03ef6e1b978549bc1d9ee81e)
    Signed-off-by: Dong Lin <li...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  4 ++--
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../kafka/api/PlaintextConsumerTest.scala          | 27 ++++++++++++++++++++++
 .../scala/unit/kafka/server/FetchRequestTest.scala | 18 ++++++++++++++-
 4 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6a99db3..e3dc921 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -573,7 +573,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
                 // client.
                 new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
-                  FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+                  partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
                   new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
               } catch {
                 case e: UnsupportedCompressionTypeException =>
@@ -582,7 +582,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
             }
           case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
-            FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+            partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
             unconvertedRecords)
         }
       }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d5a3c68..e3feb71 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -95,7 +95,7 @@ case class LogReadResult(info: FetchDataInfo,
 
   override def toString =
     s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
-    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]"
+    s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
 
 }
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 522ca49..a23513f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1587,6 +1587,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testPerPartitionLagMetricsWhenReadCommitted() {
+    val numMessages = 1000
+    // send some messages.
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
+    sendRecords(producer, numMessages, tp2)
+
+    consumerConfig.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+    TestUtils.waitUntilTrue(() => {
+      records = consumer.poll(100)
+      !records.records(tp).isEmpty
+    }, "Consumer did not consume any message before timeout.")
+    // Verify the metric exist.
+    val tags = new util.HashMap[String, String]()
+    tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+    tags.put("topic", tp.topic())
+    tags.put("partition", String.valueOf(tp.partition()))
+    val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
+    assertNotNull(fetchLag)
+  }
+
+  @Test
   def testPerPartitionLeadWithMaxPollRecords() {
     val numMessages = 1000
     val maxPollRecords = 10
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 72a2854..86f3314 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel, FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
 import org.junit.Assert._
 import org.junit.Test
@@ -172,6 +172,22 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   @Test
+  def testFetchRequestV4WithReadCommitted(): Unit = {
+    initProducer()
+    val maxPartitionBytes = 200
+    val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
+    producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+      "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
+      Seq(topicPartition))).isolationLevel(IsolationLevel.READ_COMMITTED).build(4)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+    val partitionData = fetchResponse.responseData.get(topicPartition)
+    assertEquals(Errors.NONE, partitionData.error)
+    assertTrue(partitionData.lastStableOffset > 0)
+    assertTrue(records(partitionData).map(_.sizeInBytes).sum > 0)
+  }
+
+  @Test
   def testFetchRequestToNonReplica(): Unit = {
     val topic = "topic"
     val partition = 0