You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/09 14:28:56 UTC

[GitHub] [kafka] ijuma commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

ijuma commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r485652591



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,24 +1401,20 @@ object GroupMetadataManager {
       val version = buffer.getShort
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
+      val maxVersion = GROUP_VALUE_SCHEMAS.size - 1

Review comment:
       We should add a constant in `GroupMetadataManager` for this, maybe call it `CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION` for consistency with the other two similar fields.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {

Review comment:
       Nit: `TS` -> `Timestamp` for clarity. Also, `Version` -> `Versions`.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {

Review comment:
       Is this test related to the change or a gap you identified? It's fine if it's the latter, just trying to understand.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.get)

Review comment:
       This fails without the change, right?

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue

Review comment:
       Nit: `unsupportedVersion`.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value

Review comment:
       Nit: `un-supported` -> `unsupported`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org