You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/09/21 20:42:36 UTC

[kafka] branch 2.6 updated: KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly by group coordinator (#9202)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 23f9890  KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly by group coordinator (#9202)
23f9890 is described below

commit 23f9890e0e173f4ddbf91e6e088b15abacfbe6af
Author: Luke Chen <43...@users.noreply.github.com>
AuthorDate: Tue Sep 22 04:35:16 2020 +0800

    KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly by group coordinator (#9202)
    
    Fix the `currentStateTimeStamp` doesn't get set in `GROUP_METADATA_VALUE_SCHEMA_V3`, and did a small refactor to use the `GROUP_VALUE_SCHEMAS.size - 1` replace the default hard-coded max version number. Also add test for it.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 .../coordinator/group/GroupMetadataManager.scala   | 18 ++++------
 .../group/GroupMetadataManagerTest.scala           | 41 +++++++++++++++++++++-
 2 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index dea0b77..bd0f825 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1138,6 +1138,7 @@ object GroupMetadataManager {
 
   private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = GROUP_VALUE_SCHEMAS.keySet.max
 
   private def schemaForKey(version: Int) = {
     val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
@@ -1401,23 +1402,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
+            val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
+            if (timestamp == -1) None else Some(timestamp)
+          } else None
 
         val members = memberMetadataArray.map { memberMetadataObj =>
           val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 08a868c..f7de5b2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -43,8 +43,9 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.KafkaException
 import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue, assertThrows}
 import org.junit.{Before, Test}
 import org.scalatest.Assertions.fail
 
@@ -909,6 +910,44 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unsupportedVersion = Short.MinValue
+
+    // put the unsupported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unsupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unsupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(s"the apiVersion $apiVersion doesn't set the currentStateTimestamp correctly.",
+          Some(time.milliseconds()), deserializedGroupMetadata.currentStateTimestamp)
+      else
+        assertTrue(s"the apiVersion $apiVersion should not set the currentStateTimestamp.",
+          deserializedGroupMetadata.currentStateTimestamp.isEmpty)
+    }
+  }
+
+  @Test
   def testReadFromOldGroupMetadata(): Unit = {
     val generation = 1
     val protocol = "range"