You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/21 14:57:09 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

mumrah commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054455358


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -289,6 +316,59 @@ class BrokerMetadataListener(
     BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), numBytes)
   }
 
+  private def applyRecordToDelta(
+    delta: MetadataDelta,
+    messageAndVersion: ApiMessageAndVersion,
+    baseOffset: Long,
+    index: Int,
+    snapshotName: Option[String]
+  ): Unit = {
+    try {
+      delta.replay(messageAndVersion.message())
+    } catch {
+      case e: Throwable => snapshotName match {
+        case None => metadataLoadingFaultHandler.handleFault(
+          s"Error replaying metadata log record at offset ${baseOffset + index}", e)
+        case Some(name) => metadataLoadingFaultHandler.handleFault(
+          s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e)
+      }
+    }
+  }
+
+  private def beginTransaction(newTransactionEpoch: Int, newTransactionOffset: Long): Unit = {
+    if (_transactionEpoch != -1) {
+      abortTransaction("a new begin transaction record appeared", newTransactionOffset)
+    }
+    _transactionEpoch = newTransactionEpoch
+    _transactionOffset = newTransactionOffset
+    _transactionRecords = new util.ArrayList[ApiMessageAndVersion]
+    log.debug("Beginning metadata transaction {}_{}.", _transactionOffset, _transactionEpoch)
+  }
+
+  private def endTransaction(delta: MetadataDelta, endOffset: Long): Unit = {
+    log.debug("Ending metadata transaction {}_{} at offset {}", _transactionOffset, _transactionEpoch, endOffset)
+    var index = 0
+    _transactionRecords.forEach(record => {

Review Comment:
   We should guard against a null `_transactionRecords` here. If two EndTransactionRecord-s appeared (somehow..) it would raise an NPE



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -995,6 +1024,42 @@ private void appendRaftEvent(String name, Runnable runnable) {
         }
     }
 
+    private void beginTransaction(
+        long newTransactionOffset,
+        int newTransactionEpoch
+    ) {
+        if (transactionEpoch != -1) {
+            abortTransaction("a new begin transaction record appeared", newTransactionOffset);
+        }
+        transactionOffset = newTransactionOffset;
+        transactionEpoch = newTransactionEpoch;
+        snapshotRegistry.getOrCreateSnapshot(transactionOffset);
+        log.debug("Beginning metadata transaction {}_{}.", transactionOffset, transactionEpoch);
+    }
+
+    private void endTransaction(long offset) {
+        if (transactionEpoch == -1) {
+            throw fatalFaultHandler.handleFault("Tried to end a transaction at offset " + offset +
+                " but there was no current transaction.");
+        }
+        transactionOffset = -1L;
+        transactionEpoch = -1;
+        log.debug("Completing metadata transaction {}_{}.", transactionOffset, transactionEpoch);
+    }
+
+    private void abortTransaction(String reason, long offset) {

Review Comment:
   maybe give "offset" a more explicit name?



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -254,6 +270,9 @@ class BrokerMetadataListener(
     while (iterator.hasNext) {
       val batch = iterator.next()
 
+      if (_transactionEpoch != -1 && _transactionEpoch != batch.epoch) {
+        abortTransaction("the log epoch changed to " + batch.epoch, batch.baseOffset)

Review Comment:
   Since we're in scala here, we can use s-strings



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -860,7 +886,7 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             int i = 1;
                             for (ApiMessageAndVersion message : messages) {
                                 try {
-                                    replay(message.message(), Optional.empty(), offset);
+                                    replay(message.message(), Optional.empty(), offset + i - 1, epoch);

Review Comment:
   The `offset` -> `offset + i - 1` change, was that a bug previously? Were we just sending the batch offset to replay before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org