You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/04/05 23:13:19 UTC

[GitHub] [kafka] jeffkbkim opened a new pull request, #13511: KAFKA-14869: ignore unknown record types for coordinators

jeffkbkim opened a new pull request, #13511:
URL: https://github.com/apache/kafka/pull/13511

   KIP-915: Part 1 of 2
   
   Coordinators should Ignore unknown record types
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165986528


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +152,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction record
+
+        case unexpectedKey =>
+          throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.")

Review Comment:
   Does it help if we use a « sealed trait »?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13511: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, Part-1)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1168294803


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +682,11 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case unknownKey: UnknownKey =>
+                    // Unknown versions may exist when a downgraded coordinator is reading records from the log.
+                    warn(s"Unknown message key with version ${unknownKey.version}" +
+                      s" while loading offsets and group metadata. Ignoring it. " +

Review Comment:
   nit: I wonder if we could add `while loading offsets and group metadata from $topicPartition`. Having the partition may be useful as well.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +682,11 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case unknownKey: UnknownKey =>
+                    // Unknown versions may exist when a downgraded coordinator is reading records from the log.
+                    warn(s"Unknown message key with version ${unknownKey.version}" +
+                      s" while loading offsets and group metadata. Ignoring it. " +
+                      s"It could be a left over from an aborted upgrade.")

Review Comment:
   nit: We can remove the `s` as there is no interpolation for this one.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +682,11 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case unknownKey: UnknownKey =>
+                    // Unknown versions may exist when a downgraded coordinator is reading records from the log.

Review Comment:
   nit: We could remove this comment as the warning contains everything.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -467,16 +467,23 @@ class TransactionStateManager(brokerId: Int,
             memRecords.batches.forEach { batch =>
               for (record <- batch.asScala) {
                 require(record.hasKey, "Transaction state log's key should not be null")
-                val txnKey = TransactionLog.readTxnRecordKey(record.key)
-                // load transaction metadata along with transaction state
-                val transactionalId = txnKey.transactionalId
-                TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
-                  case None =>
-                    loadedTransactions.remove(transactionalId)
-                  case Some(txnMetadata) =>
-                    loadedTransactions.put(transactionalId, txnMetadata)
+                TransactionLog.readTxnRecordKey(record.key) match {
+                  case txnKey: TxnKey =>
+                    // load transaction metadata along with transaction state
+                    val transactionalId = txnKey.transactionalId
+                    TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
+                      case None =>
+                        loadedTransactions.remove(transactionalId)
+                      case Some(txnMetadata) =>
+                        loadedTransactions.put(transactionalId, txnMetadata)
+                    }
+                    currOffset = batch.nextOffset
+
+                  case unknownKey: UnknownKey =>
+                    warn(s"Unknown message key with version ${unknownKey.version}" +
+                      s" while loading transaction state. Ignoring it. " +

Review Comment:
   nit: Could we add the partition here as well?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -467,16 +467,23 @@ class TransactionStateManager(brokerId: Int,
             memRecords.batches.forEach { batch =>
               for (record <- batch.asScala) {
                 require(record.hasKey, "Transaction state log's key should not be null")
-                val txnKey = TransactionLog.readTxnRecordKey(record.key)
-                // load transaction metadata along with transaction state
-                val transactionalId = txnKey.transactionalId
-                TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
-                  case None =>
-                    loadedTransactions.remove(transactionalId)
-                  case Some(txnMetadata) =>
-                    loadedTransactions.put(transactionalId, txnMetadata)
+                TransactionLog.readTxnRecordKey(record.key) match {
+                  case txnKey: TxnKey =>
+                    // load transaction metadata along with transaction state
+                    val transactionalId = txnKey.transactionalId
+                    TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
+                      case None =>
+                        loadedTransactions.remove(transactionalId)
+                      case Some(txnMetadata) =>
+                        loadedTransactions.put(transactionalId, txnMetadata)
+                    }
+                    currOffset = batch.nextOffset
+
+                  case unknownKey: UnknownKey =>
+                    warn(s"Unknown message key with version ${unknownKey.version}" +
+                      s" while loading transaction state. Ignoring it. " +
+                      s"It could be a left over from an aborted upgrade.")

Review Comment:
   nit: `s` can be removed here as well.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2762,4 +2768,12 @@ class GroupMetadataManagerTest {
     assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
     assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
   }
+
+  @Test
+  def testIgnoreUnknownMessageKeyVersion(): Unit = {

Review Comment:
   nit: `testReadMessageKeyCanReadUnknownMessage`?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -640,8 +640,14 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+
+    // Should ignore unknown record

Review Comment:
   I was thinking about this one. I wonder if it would not be better to pull this into a separate unit test instead of hijacking existing ones. Is it possible?



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,11 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testReadUnknownMessageKeyVersion(): Unit = {

Review Comment:
   nit: `testReadTxnRecordKeyCanReadUnknownMessage`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13511: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, Part-1)

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1168339962


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -640,8 +640,14 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+
+    // Should ignore unknown record
+    val unknownKey = new org.apache.kafka.coordinator.group.generated.GroupMetadataKey()
+    val unknownMessage = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, unknownKey)
+    val unknownRecord = new SimpleRecord(unknownMessage, unknownMessage)
+
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
+      (offsetCommitRecords ++ Seq(unknownRecord) ++ Seq(groupMetadataRecord)).toArray: _*)

Review Comment:
   Nit: maybe we could have a test with a few more unknown records in the list with different version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165922479


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1368,3 +1378,8 @@ case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
   override def toString: String = key
 }
 
+case class UnknownKey(version: Short, key: String = null) extends BaseKey {

Review Comment:
   thanks. removed the field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165919308


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1155,7 +1159,12 @@ object GroupMetadataManager {
       // version 2 refers to group metadata
       val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), version)
       GroupMetadataKey(version, key.group)
-    } else throw new IllegalStateException(s"Unknown group metadata message version: $version")
+    } else {
+      // Unknown versions may exist when a downgraded coordinator is reading records from the log.
+      warn(s"Found unknown message key version: $version." +
+        s" The downgraded coordinator will ignore this key and corresponding value.")

Review Comment:
   makes sense, will use Unknown instead of Unexpected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13511:
URL: https://github.com/apache/kafka/pull/13511#issuecomment-1499126132

   will re-trigger build once https://github.com/apache/kafka/pull/13512 is merged. all test failures are related to streams


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165928094


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case _: UnknownKey => // do nothing
+
+                  case unexpectedKey =>
+                    throw new IllegalStateException(s"Unexpected message key $unexpectedKey while loading offsets and group metadata")

Review Comment:
   we need this: `match may not be exhaustive.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1167009340


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case _: UnknownKey => // do nothing
+
+                  case unexpectedKey =>
+                    throw new IllegalStateException(s"Unexpected message key $unexpectedKey while loading offsets and group metadata")

Review Comment:
   that works. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1163731655


##########
clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java:
##########
@@ -231,4 +231,38 @@ public static byte[] toVersionPrefixedBytes(final short version, final Message m
                 buffer.limit() == buffer.array().length) return buffer.array();
         else return Utils.toArray(buffer);
     }
+
+    // Should only be used for testing
+    public static byte[] messageWithUnknownVersion() {

Review Comment:
   It is a bit awkward to have this in the `src` directory. I am not really sure to understand why we need it as well. I suppose that we could use any other `Message` like record from `test`, isn't it?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1145,17 +1150,22 @@ object GroupMetadataManager {
    * @param buffer input byte-buffer
    * @return an OffsetKey or GroupMetadataKey object from the message
    */
-  def readMessageKey(buffer: ByteBuffer): BaseKey = {
+  def readMessageKey(buffer: ByteBuffer): Option[BaseKey] = {

Review Comment:
   Have you considered introducing an `UnknownKey` which extends `BaseKey`? The `UnknownKey` could carry on the version and even the bytes. It seems that the change would be less disruptive like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1166515699


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -576,7 +576,10 @@ class GroupMetadataManager(brokerId: Int,
     }
   }
 
-  private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = {
+  // Visible for testing
+  private[group] def doLoadGroupsAndOffsets(topicPartition: TopicPartition,

Review Comment:
   nit: It seems that we could revert this change now.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1273,9 +1283,10 @@ object GroupMetadataManager {
       throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)")
     } else {
       GroupMetadataManager.readMessageKey(record.key) match {
-        case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
-        case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value)
-        case _ => throw new KafkaException("Failed to decode message using offset topic decoder (message had an invalid key)")
+          case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
+          case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value)
+          case unknownKey: UnknownKey => (Some(s"UNKNOWN(version=${unknownKey.version})"), None)

Review Comment:
   nit: In order to follow the format of the offset and group keys, I wonder if we should use `unknown::version=$version`. What do you think?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +149,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction record
+
+        case unexpectedKey =>
+          throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.")

Review Comment:
   `writeTo` is actually used to write records to the log. It does not make sense to swallow unknown records here. Therefore, we should throw an `IllegalStateException` for unknown records as well. Moreover, the message should be change as `while reading transaction log` is not correct.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -167,25 +174,44 @@ object TransactionLog {
    * Exposed for printing records using [[kafka.tools.DumpLogSegments]]
    */
   def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
-    val txnKey = TransactionLog.readTxnRecordKey(record.key)
-    val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
-
-    val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
-      case None => "<DELETE>"
-
-      case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
-        s"producerEpoch:${txnMetadata.producerEpoch}," +
-        s"state=${txnMetadata.state}," +
-        s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
-        s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
-        s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
-    }
+    TransactionLog.readTxnRecordKey(record.key) match {
+      case txnKey: TxnKey =>
+        val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
+
+        val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
+          case None => "<DELETE>"
+
+          case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
+            s"producerEpoch:${txnMetadata.producerEpoch}," +
+            s"state=${txnMetadata.state}," +
+            s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
+            s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
+            s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
+        }
+
+        (Some(keyString), Some(valueString))
+
+      case _: UnknownKey =>
+        (Some("<UNKNOWN>"), Some("<UNKNOWN>"))
 
-    (Some(keyString), Some(valueString))
+      case unexpectedKey =>
+        throw new IllegalStateException(s"Found unexpected key $unexpectedKey while formatting transaction log.")

Review Comment:
   I think that we could remove this one if we use a `sealed trait`.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case _: UnknownKey => // do nothing
+
+                  case unexpectedKey =>
+                    throw new IllegalStateException(s"Unexpected message key $unexpectedKey while loading offsets and group metadata")

Review Comment:
   I think that we could use a `sealed trait` to fix this. Does it work?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -167,25 +174,44 @@ object TransactionLog {
    * Exposed for printing records using [[kafka.tools.DumpLogSegments]]
    */
   def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
-    val txnKey = TransactionLog.readTxnRecordKey(record.key)
-    val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
-
-    val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
-      case None => "<DELETE>"
-
-      case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
-        s"producerEpoch:${txnMetadata.producerEpoch}," +
-        s"state=${txnMetadata.state}," +
-        s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
-        s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
-        s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
-    }
+    TransactionLog.readTxnRecordKey(record.key) match {
+      case txnKey: TxnKey =>
+        val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
+
+        val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
+          case None => "<DELETE>"
+
+          case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
+            s"producerEpoch:${txnMetadata.producerEpoch}," +
+            s"state=${txnMetadata.state}," +
+            s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
+            s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
+            s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
+        }
+
+        (Some(keyString), Some(valueString))
+
+      case _: UnknownKey =>
+        (Some("<UNKNOWN>"), Some("<UNKNOWN>"))

Review Comment:
   Let's use the same format than we used for the groups.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##########
@@ -1085,4 +1087,41 @@ class TransactionStateManagerTest {
     assertTrue(partitionLoadTime("partition-load-time-max") >= 0)
     assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0)
   }
+

Review Comment:
   nit: There is an extra empty line.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -467,16 +467,26 @@ class TransactionStateManager(brokerId: Int,
             memRecords.batches.forEach { batch =>
               for (record <- batch.asScala) {
                 require(record.hasKey, "Transaction state log's key should not be null")
-                val txnKey = TransactionLog.readTxnRecordKey(record.key)
-                // load transaction metadata along with transaction state
-                val transactionalId = txnKey.transactionalId
-                TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
-                  case None =>
-                    loadedTransactions.remove(transactionalId)
-                  case Some(txnMetadata) =>
-                    loadedTransactions.put(transactionalId, txnMetadata)
+                TransactionLog.readTxnRecordKey(record.key) match {
+                  case txnKey: TxnKey =>
+                    // load transaction metadata along with transaction state
+                    val transactionalId = txnKey.transactionalId
+                    TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
+                      case None =>
+                        loadedTransactions.remove(transactionalId)
+                      case Some(txnMetadata) =>
+                        loadedTransactions.put(transactionalId, txnMetadata)
+                    }
+                    currOffset = batch.nextOffset
+
+                  case unknownKey: UnknownKey =>
+                    warn(s"Unknown message key with version ${unknownKey.version}" +
+                      s" while loading transaction state. Ignoring it. " +
+                      s"It could be a left over from an aborted upgrade.")
+
+                  case unexpectedKey =>
+                    throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.")

Review Comment:
   I think that we could remove this one if we use a `sealed trait`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13511: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, Part-1)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13511:
URL: https://github.com/apache/kafka/pull/13511#issuecomment-1512737414

   Merged to trunk, 3.5 and 3.4. We need to open PRs for older branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165929989


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +152,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction record
+
+        case unexpectedKey =>
+          throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.")

Review Comment:
   we need this: `match may not be exhaustive.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165769118


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +152,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction record

Review Comment:
   Should we log or print it to notify the user of the presence of the key?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -167,25 +177,43 @@ object TransactionLog {
    * Exposed for printing records using [[kafka.tools.DumpLogSegments]]
    */
   def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
-    val txnKey = TransactionLog.readTxnRecordKey(record.key)
-    val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
-
-    val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
-      case None => "<DELETE>"
-
-      case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
-        s"producerEpoch:${txnMetadata.producerEpoch}," +
-        s"state=${txnMetadata.state}," +
-        s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
-        s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
-        s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
-    }
+    TransactionLog.readTxnRecordKey(record.key) match {
+      case txnKey: TxnKey =>
+        val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
+
+        val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
+          case None => "<DELETE>"
+
+          case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
+            s"producerEpoch:${txnMetadata.producerEpoch}," +
+            s"state=${txnMetadata.state}," +
+            s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
+            s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
+            s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
+        }
+
+        (Some(keyString), Some(valueString))
+
+      case _: UnknownKey =>
+        (Some("<UNKNOWN>"), Some("<UNKNOWN>"))
 
-    (Some(keyString), Some(valueString))
+      case unexpectedKey =>
+        throw new IllegalStateException(s"Found unexpected key $unexpectedKey while formatting transaction log.")
+    }
   }
 
 }
 
-case class TxnKey(version: Short, transactionalId: String) {
+trait BaseKey{
+  def version: Short
+  def transactionalId: String
+}
+
+case class TxnKey(version: Short, transactionalId: String) extends BaseKey {
   override def toString: String = transactionalId
 }
+
+case class UnknownKey(version: Short, transactionalId: String = null) extends BaseKey {

Review Comment:
   Is the `transactionalId` going to be populated?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1368,3 +1378,8 @@ case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
   override def toString: String = key
 }
 
+case class UnknownKey(version: Short, key: String = null) extends BaseKey {

Review Comment:
   Is the `key` going to be populated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13511: KAFKA-14869: Ignore unknown record types for coordinators (KIP-915, Part-1)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13511:
URL: https://github.com/apache/kafka/pull/13511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165647641


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case _: UnknownKey => // do nothing

Review Comment:
   Should we put the warning here instead of having it in `readMessageKey`?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+                  case _: UnknownKey => // do nothing
+
+                  case unexpectedKey =>
+                    throw new IllegalStateException(s"Unexpected message key $unexpectedKey while loading offsets and group metadata")

Review Comment:
   Is this one still required?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1368,3 +1378,8 @@ case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
   override def toString: String = key
 }
 
+case class UnknownKey(version: Short, key: String = null) extends BaseKey {
+

Review Comment:
   nit: I would remove this empty line.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,11 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testReadUnknownMessageKeyVersion(): Unit = {
+    val record = new TransactionLogKey()
+    val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record)
+    TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord))

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1273,9 +1282,10 @@ object GroupMetadataManager {
       throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)")
     } else {
       GroupMetadataManager.readMessageKey(record.key) match {
-        case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
-        case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value)
-        case _ => throw new KafkaException("Failed to decode message using offset topic decoder (message had an invalid key)")
+          case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
+          case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value)
+          case _: UnknownKey => (Some("<UNKNOWN>"), Some("<UNKNOWN>"))

Review Comment:
   For the key, could we say `Unknown(version=$version`? For the value, could we just return `None`?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1155,7 +1159,12 @@ object GroupMetadataManager {
       // version 2 refers to group metadata
       val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), version)
       GroupMetadataKey(version, key.group)
-    } else throw new IllegalStateException(s"Unknown group metadata message version: $version")
+    } else {
+      // Unknown versions may exist when a downgraded coordinator is reading records from the log.
+      warn(s"Found unknown message key version: $version." +
+        s" The downgraded coordinator will ignore this key and corresponding value.")

Review Comment:
   `downgraded coordinator` reads a bit weird here. How about: `Unexpected message key with version ($version) while loading offsets and group metadata. Ignoring it.`?
   
   I wonder if we should put a sentence like `It could be a left over from an aborted upgrade.`. What do you think?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +152,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction record
+
+        case unexpectedKey =>
+          throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.")

Review Comment:
   Same question about this one.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##########
@@ -758,7 +758,7 @@ class TransactionStateManagerTest {
     appendedRecords.values.foreach { batches =>
       batches.foreach { records =>
         records.records.forEach { record =>
-          val transactionalId = TransactionLog.readTxnRecordKey(record.key).transactionalId
+          val transactionalId = TransactionLog.readTxnRecordKey(record.key).get.transactionalId

Review Comment:
   I am not sure to understand why we need this `get` here.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +152,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction record

Review Comment:
   nit: I would also add the logging here.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -640,8 +640,13 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+
+    // Should ignore unknown record
+    val unknownMessage = MessageUtil.messageWithUnknownVersion()

Review Comment:
   Does this method still exist?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2762,4 +2767,12 @@ class GroupMetadataManagerTest {
     assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
     assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
   }
+
+  @Test
+  def testIgnoreUnknownMessageKeyVersion(): Unit = {
+    val record = new org.apache.kafka.coordinator.group.generated.GroupMetadataKey()
+    val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record)
+    GroupMetadataManager.readMessageKey(ByteBuffer.wrap(unknownRecord))

Review Comment:
   nit: Could we assert the returned value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13511: KAFKA-14869: ignore unknown record types for coordinators

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1167257449


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +149,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction record
+
+        case unexpectedKey =>
+          throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.")

Review Comment:
   this is used to write to output stream in console consumer. shouldn’t we be printing unknown::version=$version here as well? i agree the log wording needs to be changed. i have replaced the exception with a print for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org