You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/20 04:28:11 UTC

[GitHub] [kafka] showuon opened a new pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

showuon opened a new pull request #9202:
URL: https://github.com/apache/kafka/pull/9202


   Fix the `currentStateTimeStamp` doesn't get set in `GROUP_METADATA_VALUE_SCHEMA_V3`, and did a small refactor to use the `GROUP_VALUE_SCHEMAS.size - 1` replace the default hard-coded max version number. Also add test for it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9202:
URL: https://github.com/apache/kafka/pull/9202


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r491908542



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 1 with the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694684653


   `tests/Build/JDK 15`: pass
   `tests/Build/JDK 8`: pass
   `tests/Build/JDK 11`: failed 1 test case which is not related to my change.
   kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota failed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-690082467


   @ijuma , thanks for your good comments/suggestions. I've updated in this commit: https://github.com/apache/kafka/pull/9202/commits/f01aaa643840c871d613c9e0af538b79519f2667. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r475046516



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,16 +1401,17 @@ object GroupMetadataManager {
       val version = buffer.getShort
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
+      val maxVersion = GROUP_VALUE_SCHEMAS.size - 1
 
-      if (version >= 0 && version <= 3) {
+      if (0 to maxVersion contains version) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
         val currentStateTimestamp: Option[Long] = version match {

Review comment:
       We should use an if/else here, no point in using pattern matching.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,16 +1401,17 @@ object GroupMetadataManager {
       val version = buffer.getShort
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
+      val maxVersion = GROUP_VALUE_SCHEMAS.size - 1
 
-      if (version >= 0 && version <= 3) {
+      if (0 to maxVersion contains version) {

Review comment:
       I'd keep the old code and simply change the second clause to use `maxVersion`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694592201


   Merge the latest trunk so that the flaky tests (addressed in https://github.com/apache/kafka/pull/9294) can be fixed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r486161428



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.get)

Review comment:
       Correct! Also, after your question, I tried and found the message when test failed will be:
   ```
   kafka.coordinator.group.GroupMetadataManagerTest > testCurrentStateTimestampForAllGroupMetadataVersions FAILED
       java.util.NoSuchElementException: None.get
   ```
   It's not clear and not helpful for debugging. So I added failed message and use `getOrElse(-1)` to get an unexpected timestamp when null. Now, the failed message will be:
   ```
   kafka.coordinator.group.GroupMetadataManagerTest > testCurrentStateTimestampForAllGroupMetadataVersions FAILED
       java.lang.AssertionError: the apiVersion 2.3-IV0 doesn't set the currentStateTimestamp correctly. expected:<1599726297785> but was:<-1>
   ```
   It'll be more clear to show which version cause this error and what error it is. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r485652591



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,24 +1401,20 @@ object GroupMetadataManager {
       val version = buffer.getShort
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
+      val maxVersion = GROUP_VALUE_SCHEMAS.size - 1

Review comment:
       We should add a constant in `GroupMetadataManager` for this, maybe call it `CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION` for consistency with the other two similar fields.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {

Review comment:
       Nit: `TS` -> `Timestamp` for clarity. Also, `Version` -> `Versions`.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {

Review comment:
       Is this test related to the change or a gap you identified? It's fine if it's the latter, just trying to understand.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.get)

Review comment:
       This fails without the change, right?

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue

Review comment:
       Nit: `unsupportedVersion`.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value

Review comment:
       Nit: `un-supported` -> `unsupported`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r492203555



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       No strong opinion. I'm ok leaving the check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-686346005


   merge the latest trunk to have auto-build


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-678914354


   @ijuma , thanks for your comments. I've updated in this commit: https://github.com/apache/kafka/pull/9202/commits/ae5ae323f159afc9bb384252e45bef640531ea94. I also added one more test to test the unsupported version exception thrown case. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-684517819


   @ijuma , could you help review this PR again? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r491908542



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 0 or 1 containing the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r491908542



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 0 or 1 with the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-686864179


   `tests/Build/JDK 11` and `tests/Build/JDK 15` test passed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r492203555



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       No strong opinion. I'm ok leaving the check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-677036776


   @abbccdda @vahidhashemian @hachikuji , could you help review this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-692508123


   @ijuma , could you check this PR again? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r486161428



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.get)

Review comment:
       Correct! Also, after your question, I tried and found the message when test failed will be:
   ```
   kafka.coordinator.group.GroupMetadataManagerTest > testCurrentStateTimestampForAllGroupMetadataVersions FAILED
       java.util.NoSuchElementException: None.get
   ```
   It's not clear and not helpful for debugging. So I added failed message and use `getOrElse(-1)` to get an unexpected timestamp when none. Now, the failed message will be:
   ```
   kafka.coordinator.group.GroupMetadataManagerTest > testCurrentStateTimestampForAllGroupMetadataVersions FAILED
       java.lang.AssertionError: the apiVersion 2.3-IV0 doesn't set the currentStateTimestamp correctly. expected:<1599726297785> but was:<-1>
   ```
   It'll be more clear to show which version cause this error and what error it is. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694693926


   @ijuma , test result is listed in the previous comment. I think it's good to merge. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r491240953



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       I wonder if the version check is necessary. Is the presence of the field enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-689244512


   @ijuma  , could you help review this PR again? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r486162637



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {

Review comment:
       This test doesn't relate to the change. I just like to add missing tests for what I saw. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r490283148



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1139,6 +1139,7 @@ object GroupMetadataManager {
 
   private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = GROUP_VALUE_SCHEMAS.size - 1

Review comment:
       good suggestion! Thanks.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,44 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unsupportedVersion = Short.MinValue
+
+    // put the unsupported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unsupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unsupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(s"the apiVersion $apiVersion doesn't set the currentStateTimestamp correctly.",
+          time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.getOrElse(-1))

Review comment:
       Good suggestion! Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-678594470


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-678914354


   @ijuma , thanks for your comments. I've updated in this commit: https://github.com/apache/kafka/pull/9202/commits/ae5ae323f159afc9bb384252e45bef640531ea94. I'm also adding one more test to test the unsupported version exception thrown case. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r486161428



##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,42 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unSupportedVersion = Short.MinValue
+
+    // put the un-supported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unSupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unSupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTSForAllGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.get)

Review comment:
       Correct! Also, after your question, I tried and found the message when test failed will be:
   ```
   kafka.coordinator.group.GroupMetadataManagerTest > testCurrentStateTimestampForAllGroupMetadataVersions FAILED
       java.util.NoSuchElementException: None.get
   ```
   It's not clear and not helpful for debugging. So I added failed message and use `getOrElse(-1)` to get an unexpected timestamp. Now, the failed message will be:
   ```
   kafka.coordinator.group.GroupMetadataManagerTest > testCurrentStateTimestampForAllGroupMetadataVersions FAILED
       java.lang.AssertionError: the apiVersion 2.3-IV0 doesn't set the currentStateTimestamp correctly. expected:<1599726297785> but was:<-1>
   ```
   It'll be more clear to show which version cause this error and what error it is. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r490262817



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1139,6 +1139,7 @@ object GroupMetadataManager {
 
   private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = GROUP_VALUE_SCHEMAS.size - 1

Review comment:
       This would be more robust if we did something like:
   
   ```scala
   GROUP_VALUE_SCHEMAS.keySet.max
   ````
   
   Or something along those lines.

##########
File path: core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,44 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unsupportedVersion = Short.MinValue
+
+    // put the unsupported version as the version value
+    val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unsupportedVersion)
+    // reset the position to the starting position 0 so that it can read the data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unsupportedVersion}", e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(s"the apiVersion $apiVersion doesn't set the currentStateTimestamp correctly.",
+          time.milliseconds(), deserializedGroupMetadata.currentStateTimestamp.getOrElse(-1))

Review comment:
       I think it would be better to have the expected side be `Some(time.milliseconds())`, then you don't need the `getOrElse` on the actual side.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694267556


   @ijuma , thanks for the comments. I've updated in this commit: https://github.com/apache/kafka/pull/9202/commits/97fe0a53d450d103e25dae0669c3ac803d7afe40. Thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9202:
URL: https://github.com/apache/kafka/pull/9202


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r491908542



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 1 with the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 0 or 1 with the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 0 or 1 containing the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 0 or 1 containing the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and make it more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r486159096



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,24 +1401,20 @@ object GroupMetadataManager {
       val version = buffer.getShort
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
+      val maxVersion = GROUP_VALUE_SCHEMAS.size - 1

Review comment:
       Good suggestion! Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r475352582



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,16 +1401,17 @@ object GroupMetadataManager {
       val version = buffer.getShort
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
+      val maxVersion = GROUP_VALUE_SCHEMAS.size - 1
 
-      if (version >= 0 && version <= 3) {
+      if (0 to maxVersion contains version) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
         val currentStateTimestamp: Option[Long] = version match {

Review comment:
       Nice catch! Yes, it's better to use if/else. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r491908542



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1402,23 +1403,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {

Review comment:
       @hachikuji , well, you're right, the `CURRENT_STATE_TIMESTAMP_KEY` field only set in `version >= 2`... in normal cases. I'm not sure if there's possibility that there will be version 0 or 1 containing the `CURRENT_STATE_TIMESTAMP_KEY` field situation. In my opinion, I think the check is OK and make it more robust, and most importantly, it's a simple check, not too many overhead to the resource. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org