You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/09/21 20:52:45 UTC
[kafka] branch 2.4 updated: KAFKA-10401;
Ensure `currentStateTimeStamp` is set correctly by group
coordinator (#9202)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 50d2871 KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly by group coordinator (#9202)
50d2871 is described below
commit 50d2871b54f8aafb79ea076e225a7630a5a34e9d
Author: Luke Chen <43...@users.noreply.github.com>
AuthorDate: Tue Sep 22 04:35:16 2020 +0800
KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly by group coordinator (#9202)
Fix the `currentStateTimeStamp` doesn't get set in `GROUP_METADATA_VALUE_SCHEMA_V3`, and did a small refactor to use the `GROUP_VALUE_SCHEMAS.size - 1` replace the default hard-coded max version number. Also add test for it.
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
.../coordinator/group/GroupMetadataManager.scala | 18 ++++------
.../group/GroupMetadataManagerTest.scala | 41 +++++++++++++++++++++-
2 files changed, 47 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index ce04269..c04b5bb 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1123,6 +1123,7 @@ object GroupMetadataManager {
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+ private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = GROUP_VALUE_SCHEMAS.keySet.max
private def schemaForKey(version: Int) = {
val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
@@ -1386,23 +1387,18 @@ object GroupMetadataManager {
val valueSchema = schemaForGroupValue(version)
val value = valueSchema.read(buffer)
- if (version >= 0 && version <= 3) {
+ if (version >= 0 && version <= CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
val memberMetadataArray = value.getArray(MEMBERS_KEY)
val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
- val currentStateTimestamp: Option[Long] = version match {
- case version if version == 2 =>
- if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
- val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
- if (timestamp == -1) None else Some(timestamp)
- } else
- None
- case _ =>
- None
- }
+ val currentStateTimestamp: Option[Long] =
+ if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
+ val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
+ if (timestamp == -1) None else Some(timestamp)
+ } else None
val members = memberMetadataArray.map { memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b3a7f66..2256281 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -43,8 +43,9 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.KafkaException
import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue, assertThrows}
import org.junit.{Before, Test}
import org.scalatest.Assertions.fail
@@ -899,6 +900,44 @@ class GroupMetadataManagerTest {
}
@Test
+ def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+ val generation = 1
+ val protocol = "range"
+ val memberId = "memberId"
+ val unsupportedVersion = Short.MinValue
+
+ // put the unsupported version as the version value
+ val groupMetadataRecordValue = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+ .value().putShort(unsupportedVersion)
+ // reset the position to the starting position 0 so that it can read the data in correct order
+ groupMetadataRecordValue.position(0)
+
+ val e = assertThrows(classOf[KafkaException],
+ () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
+ assertEquals(s"Unknown group metadata version ${unsupportedVersion}", e.getMessage)
+ }
+
+ @Test
+ def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+ val generation = 1
+ val protocol = "range"
+ val memberId = "memberId"
+
+ for (apiVersion <- ApiVersion.allVersions) {
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
+
+ val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
+ // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the currentStateTimestamp
+ if (apiVersion >= KAFKA_2_1_IV0)
+ assertEquals(s"the apiVersion $apiVersion doesn't set the currentStateTimestamp correctly.",
+ Some(time.milliseconds()), deserializedGroupMetadata.currentStateTimestamp)
+ else
+ assertTrue(s"the apiVersion $apiVersion should not set the currentStateTimestamp.",
+ deserializedGroupMetadata.currentStateTimestamp.isEmpty)
+ }
+ }
+
+ @Test
def testReadFromOldGroupMetadata(): Unit = {
val generation = 1
val protocol = "range"